在上篇文章,阿堂和大家分享了《分布式消息隊列中間件系列研究之阿堂教程(基礎篇-Local模式)》,後面由于時間關系,就一直沒有接着寫了。忙裡偷閑,昨天晚上在家寫了大部分,今天抽點時間阿堂将繼續為大家奉獻完成《分布式消息隊列中間件系列研究之阿堂教程(進階篇)》。這裡阿堂結合發送郵件的一個相對具體的案例,應該說是有一定的代表性的,相對比較深入的剖析開源架構metq分布式消息隊列的使用。相信通過阿堂的分享後,大家就基本明白分布式消息隊列是怎麼回事和大緻知道如何使用了。當然,網友們如果想非常深入的學習和使用metq,建議網友們可以直接到metq的官網去學習和了解。
metq使用的大緻流程如下所示
public class Productor {
private static Log log = LogFactory.getLog(Productor.class);
public static void main(String[] args) throws Exception {
//由消息工廠産生消息生産者
MessageProducer producer = MessageSessionFactoryManager.getSessionFactory(true).createProducer();
//設定topic,必須要在server.ini檔案中進行配置
final String topic = "email";
//釋出topic
producer.publish(topic);
//釋出内容(根據實際業務來組裝内容)
String line = "網絡時空(阿堂)恭喜大家2015年心想事成!";
try{
//模拟發郵件(根據實際業務定義,可以是發郵件,發短信,httpPost送出等均可)
//序列化發送内容
String jsonString = JSON.toJSONString(request, SerializerFeature.WriteClassName);
log.info("------------------------------------------------------------------------");
log.info("釋出者發送的EmailAddress = "+request.getEmailAddress());
log.info("釋出者發送的 Topic = "+request.getEmailTopic());
log.info("釋出者發送的郵件的内容= "+request.getContent());
log.info("釋出者發送的郵件發送的時間點= "+request.getCreateTime());
//釋出訂消息,這裡老何定義了重寫回調SendMessageCallbackImpl實作方法
producer.sendMessage(new Message(topic, jsonString.getBytes()),new SendMessageCallbackImpl(producer,topic,jsonString));
}catch(Exception ex){
log.error(ex);
}
------------------------------------
public class SendMessageCallbackImpl implements SendMessageCallback {
private static Log log = LogFactory.getLog(SendMessageCallbackImpl.class);
private MessageProducer messageProducer;
private String topic;
private String content;
public SendMessageCallbackImpl( MessageProducer messageProducer, String topic, String content) {
super();
this.messageProducer = messageProducer;
this.topic = topic;
this.content = content;
public void onException(Throwable e) {
log.fatal("metaq server exception , error message:" + e.getMessage());
log.info("--------------------------------------------------------------");
//出現異常時寫入日志檔案,進行補償機制
logToFile();
public void onMessageSent(SendResult result) {
log.info("result = "+result);
if (!result.isSuccess()) {
log.warn("Send " + topic + " message failed,error message:" + result.getErrorMessage());
//沒有收到broker伺服器的正常應答時寫入日志檔案,進行補償機制
else {
log.info("Send "+topic+" successfully,sent to " + result.getPartition()+" "+result.getOffset());
private void logToFile() {
//定義日志檔案
String fileName = MessageSessionFactoryManager.getMessagedir()+UUID.randomUUID().toString();
File file = new File(fileName);
FileWriter fw = null;
try {
fw = new FileWriter(file);
fw.write(topic+"\r\n");
fw.write(content);
fw.flush();
} catch (IOException e) {
log.error(e);
} finally {
if(fw != null) {
fw.close();
public class Consumer {
private static Log log = LogFactory.getLog(Consumer.class);
final String group = "meta-example";
//複用SessionFacotory(單例模式) 釋出者和訂閱者共用sessionFactory
MessageConsumer consumer = MessageSessionFactoryManager.getSessionFactory(false).createConsumer(new ConsumerConfig(group));
//每次訂閱300k的位元組流内容,将訂閱資訊儲存到本地
consumer.subscribe(topic, 1024 * 300, new EmailMessageListener());
//completeSubscribe一次性将所有的訂閱生效,并處理zk和metaq伺服器的所有互動過程
consumer.completeSubscribe();
public class EmailMessageListener implements MessageListener {
private static Log log = LogFactory.getLog(EmailMessageListener.class);
public void recieveMessages(Message message) throws InterruptedException {
log.info("Receive Email message, BrokerId-Partition:"
+ message.getPartition().getBrokerId() + ","
+ message.getPartition().getPartition()+", "+message.getTopic()+" ,"+message.getId());
//反序列化接收内容
EmailRequest emailRequest = JSON.parseObject(new String(message.getData()), EmailRequest.class);
log.info("訂閱者接收到的EmailAddress = "+emailRequest.getEmailAddress());
log.info("訂閱者接收到的 Topic = "+emailRequest.getEmailTopic());
log.info("訂閱者接收到的郵件的内容= "+emailRequest.getContent());
log.info("訂閱者接收到的郵件發送的時間點= "+emailRequest.getCreateTime());
//這裡目前網友可以根據目前系統時間 - emailRequest.getCreateTime() 比較,超過多長時間,可以丢棄此次訂閱資訊,不消費,直接return
log.info("訂閱者開始訂閱消息啦!");
log.info("開始發送郵件啦!");
//發郵件的代碼邏輯(過程略),與本文介紹的内容沒有太大聯系!
//執行發送郵件的代碼邏輯
log.info("發送郵件成功啦!");
log.info("訂閱者結束訂閱消息啦!");
log.info("結束發送郵件啦!");
log.info("End Send Email:" + emailRequest.getEmailAddress());
} catch(Exception ex) {
log.error("EmailMessageListener exception", ex);
public Executor getExecutor() {
// TODO Auto-generated method stub
return null;
----------------------------------
public class MessageSessionFactoryManager {
private static Log log = LogFactory.getLog(MessageSessionFactoryManager.class);
private static MessageSessionFactory sessionFactory = null;
private static String messagedir;
public static String getMessagedir() {
return messagedir;
private MessageSessionFactoryManager() {
public synchronized static MessageSessionFactory getSessionFactory() {
if(sessionFactory == null) {
init(true);
return sessionFactory;
public synchronized static MessageSessionFactory getSessionFactory(boolean isProducer) {
init(isProducer);
private static void init(boolean isProducer) {
String confFile = "/metaq.ini";
InputStream in = EmailMessageProducerManager.class
.getResourceAsStream(confFile);
Properties conf = new Properties();
conf.load(in);
String zookeeper = conf.getProperty("zookeeper");
messagedir = conf.getProperty("messagedir");
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = zookeeper;
metaClientConfig.setZkConfig(zkConfig);
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
//如果是生産者,則傳入值為真;如果是消費者,則傳入值為假。可以共用同一個sessionFactory
if(isProducer) {
//補償機制,這裡主要是針對如下兩種情況
//釋出者在onException中産生了異常,或者 result.isSuccess()傳回值不成功,寫會入到meaq.ini檔案中對應的messagedir目錄下
//然後由定時器每間隔30分鐘掃描一次,掃到檔案後,然後進行補償機制進行重新由producer重新釋出
Timer timer = new Timer();
//在2秒後執行此任務,每次間隔半小時掃描 D:\metaq\mmp\logs 目錄下的異常檔案,進行補償機制釋出消息 MesaageExceptionHandleTask()實作方法很簡單(由于字數限制不貼上去了)
timer.schedule(new MesaageExceptionHandleTask(), 2000, 1000*30*60);
} catch (Exception e) {
throw new RuntimeException(e.getCause());
public static MessageProducer getMessageProducer() {
return getSessionFactory(true).createProducer();
本文轉自 www19 51CTO部落格,原文連結:http://blog.51cto.com/doujh/1715275,如需轉載請自行聯系原作者