天天看點

分布式消息隊列中間件系列研究之阿堂教程(進階篇)

 在上篇文章,阿堂和大家分享了《分布式消息隊列中間件系列研究之阿堂教程(基礎篇-Local模式)》,後面由于時間關系,就一直沒有接着寫了。忙裡偷閑,昨天晚上在家寫了大部分,今天抽點時間阿堂将繼續為大家奉獻完成《分布式消息隊列中間件系列研究之阿堂教程(進階篇)》。這裡阿堂結合發送郵件的一個相對具體的案例,應該說是有一定的代表性的,相對比較深入的剖析開源架構metq分布式消息隊列的使用。相信通過阿堂的分享後,大家就基本明白分布式消息隊列是怎麼回事和大緻知道如何使用了。當然,網友們如果想非常深入的學習和使用metq,建議網友們可以直接到metq的官網去學習和了解。

metq使用的大緻流程如下所示

分布式消息隊列中間件系列研究之阿堂教程(進階篇)
分布式消息隊列中間件系列研究之阿堂教程(進階篇)
分布式消息隊列中間件系列研究之阿堂教程(進階篇)

public class Productor {

private static Log log = LogFactory.getLog(Productor.class);

public static void main(String[] args) throws Exception {

//由消息工廠産生消息生産者

MessageProducer producer = MessageSessionFactoryManager.getSessionFactory(true).createProducer();

//設定topic,必須要在server.ini檔案中進行配置

final String topic = "email";

//釋出topic

producer.publish(topic);

//釋出内容(根據實際業務來組裝内容)

String line = "網絡時空(阿堂)恭喜大家2015年心想事成!";

try{

//模拟發郵件(根據實際業務定義,可以是發郵件,發短信,httpPost送出等均可)

//序列化發送内容

String jsonString = JSON.toJSONString(request, SerializerFeature.WriteClassName);

log.info("------------------------------------------------------------------------");

log.info("釋出者發送的EmailAddress = "+request.getEmailAddress());

log.info("釋出者發送的 Topic = "+request.getEmailTopic());

log.info("釋出者發送的郵件的内容= "+request.getContent());

log.info("釋出者發送的郵件發送的時間點= "+request.getCreateTime());

//釋出訂消息,這裡老何定義了重寫回調SendMessageCallbackImpl實作方法

producer.sendMessage(new Message(topic, jsonString.getBytes()),new SendMessageCallbackImpl(producer,topic,jsonString));

}catch(Exception ex){

log.error(ex);

}

------------------------------------

public class SendMessageCallbackImpl implements SendMessageCallback {

private static Log log = LogFactory.getLog(SendMessageCallbackImpl.class);

private MessageProducer messageProducer;

private String topic;

private String content;

public SendMessageCallbackImpl( MessageProducer messageProducer, String topic, String content) {

super();

this.messageProducer = messageProducer;

this.topic = topic;

this.content = content;

public void onException(Throwable e) {

log.fatal("metaq server exception , error message:" + e.getMessage());

log.info("--------------------------------------------------------------");

//出現異常時寫入日志檔案,進行補償機制

logToFile();

public void onMessageSent(SendResult result) {

log.info("result = "+result);

if (!result.isSuccess()) {

log.warn("Send " + topic + " message failed,error message:" + result.getErrorMessage());

//沒有收到broker伺服器的正常應答時寫入日志檔案,進行補償機制

else {

log.info("Send "+topic+" successfully,sent to " + result.getPartition()+" "+result.getOffset());

private void logToFile() {

//定義日志檔案

String fileName = MessageSessionFactoryManager.getMessagedir()+UUID.randomUUID().toString();

File file = new File(fileName);

FileWriter fw = null;

try {

fw = new FileWriter(file);

fw.write(topic+"\r\n");

fw.write(content);

fw.flush();

} catch (IOException e) {

log.error(e);

} finally {

if(fw != null) {

fw.close();

public class Consumer {

private static Log log = LogFactory.getLog(Consumer.class);

final String group = "meta-example";

//複用SessionFacotory(單例模式) 釋出者和訂閱者共用sessionFactory

MessageConsumer consumer = MessageSessionFactoryManager.getSessionFactory(false).createConsumer(new ConsumerConfig(group));

//每次訂閱300k的位元組流内容,将訂閱資訊儲存到本地

consumer.subscribe(topic, 1024 * 300, new EmailMessageListener());

//completeSubscribe一次性将所有的訂閱生效,并處理zk和metaq伺服器的所有互動過程

consumer.completeSubscribe();

public class EmailMessageListener implements MessageListener {

private static Log log = LogFactory.getLog(EmailMessageListener.class);

public void recieveMessages(Message message) throws InterruptedException {

log.info("Receive Email message, BrokerId-Partition:"

+ message.getPartition().getBrokerId() + ","

+ message.getPartition().getPartition()+", "+message.getTopic()+" ,"+message.getId());

//反序列化接收内容

EmailRequest emailRequest = JSON.parseObject(new String(message.getData()), EmailRequest.class);

log.info("訂閱者接收到的EmailAddress = "+emailRequest.getEmailAddress());

log.info("訂閱者接收到的 Topic = "+emailRequest.getEmailTopic());

log.info("訂閱者接收到的郵件的内容= "+emailRequest.getContent());

log.info("訂閱者接收到的郵件發送的時間點= "+emailRequest.getCreateTime());

//這裡目前網友可以根據目前系統時間 - emailRequest.getCreateTime() 比較,超過多長時間,可以丢棄此次訂閱資訊,不消費,直接return

log.info("訂閱者開始訂閱消息啦!");

log.info("開始發送郵件啦!");

//發郵件的代碼邏輯(過程略),與本文介紹的内容沒有太大聯系!

//執行發送郵件的代碼邏輯

log.info("發送郵件成功啦!");

log.info("訂閱者結束訂閱消息啦!");

log.info("結束發送郵件啦!");

log.info("End Send Email:" + emailRequest.getEmailAddress());

} catch(Exception ex) {

log.error("EmailMessageListener exception", ex);

public Executor getExecutor() {

// TODO Auto-generated method stub

return null;

----------------------------------

public class MessageSessionFactoryManager {

private static Log log = LogFactory.getLog(MessageSessionFactoryManager.class);

private static MessageSessionFactory sessionFactory = null;

private static String messagedir;

public static String getMessagedir() {

return messagedir;

private MessageSessionFactoryManager() {

public synchronized static MessageSessionFactory getSessionFactory() {

if(sessionFactory == null) {

init(true);

return sessionFactory;

public synchronized static MessageSessionFactory getSessionFactory(boolean isProducer) {

init(isProducer);

private static void init(boolean isProducer) {

String confFile = "/metaq.ini";

InputStream in = EmailMessageProducerManager.class

.getResourceAsStream(confFile);

Properties conf = new Properties();

conf.load(in);

String zookeeper = conf.getProperty("zookeeper");

messagedir = conf.getProperty("messagedir");

final MetaClientConfig metaClientConfig = new MetaClientConfig();

final ZKConfig zkConfig = new ZKConfig();

zkConfig.zkConnect = zookeeper;

metaClientConfig.setZkConfig(zkConfig);

sessionFactory = new MetaMessageSessionFactory(metaClientConfig);

//如果是生産者,則傳入值為真;如果是消費者,則傳入值為假。可以共用同一個sessionFactory

if(isProducer) {

//補償機制,這裡主要是針對如下兩種情況

//釋出者在onException中産生了異常,或者 result.isSuccess()傳回值不成功,寫會入到meaq.ini檔案中對應的messagedir目錄下

//然後由定時器每間隔30分鐘掃描一次,掃到檔案後,然後進行補償機制進行重新由producer重新釋出

Timer timer = new Timer();

//在2秒後執行此任務,每次間隔半小時掃描 D:\metaq\mmp\logs 目錄下的異常檔案,進行補償機制釋出消息 MesaageExceptionHandleTask()實作方法很簡單(由于字數限制不貼上去了)

timer.schedule(new MesaageExceptionHandleTask(), 2000, 1000*30*60);

} catch (Exception e) {

throw new RuntimeException(e.getCause());

public static MessageProducer getMessageProducer() {

return getSessionFactory(true).createProducer();

本文轉自 www19 51CTO部落格,原文連結:http://blog.51cto.com/doujh/1715275,如需轉載請自行聯系原作者