文章目錄
-
-
-
- 為什麼要引入消息中間件
- 什麼是ActiveMQ
- 點對點模式(point-to-point)
-
- 消息生産者編碼(Queue)
- 消息消費者編碼(Queue,同步阻塞和監聽器兩種消費方式)
- 釋出訂閱模式(publish-and-subsrcibe)
-
- 釋出消息編碼(Topic)
- 消費消息編碼(Topic)
- 隊列Queue和主題Topic的比較
- 幾款流行的消息隊列的詳細比較
-
-
為什麼要引入消息中間件
- 要做到系統解耦,當新的子產品接進來時,可以做到代碼改動最小:能夠解耦
- 設定流量緩沖池,可以讓後端系統按照自身吞吐能力進行消費,不被沖垮:能夠削峰
- 強弱依賴梳理能将非關鍵調用鍊路的操作異步化并提升整體系統的吞吐能力:能夠異步
什麼是ActiveMQ
MQ是消息中間件,是一種在分布式系統中應用程式借以傳遞消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的開源項目,完全支援JMS1.1和J2EE1.4規範的JMS Provider實作。
特點:
- 支援多種語言編寫用戶端
- 對spring的支援,很容易和spring整合
- 支援多種傳輸協定:TCP,SSL,NIO,UDP等
- 支援AJAX
消息形式:
- 點對點(queue)
ActiveMQ學習(一) - 一對多(topic)
ActiveMQ學習(一)
點對點模式(point-to-point)
- 每個消息隻能有一個消費者,類似1對1的關系。
- 消息的生産和消費者之間沒有時間上的相關性 ,無論消費者在生産者發送消息的時候是否處于運作狀态,消費者都可以提取消息。
- 消息被消費後隊列中不會再存儲,是以消費者不會消費到已經被消費掉的消息。
消息生産者編碼(Queue)
//1.建立連接配接工程
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.通過連接配接工廠,獲得連接配接并啟動通路
Connection connection = connectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數,第一個叫事務,第二個叫簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地Destination(是隊列queue還是主題topic)
Queue queue = session.createQueue("activemq-queue");
//5.建立消息的生産者
MessageProducer producer = session.createProducer(queue);
//6.通過使用producer生産3條消息發送到mq的隊列中
for (int i=0;i<3;i++){
//7.建立消息
TextMessage textMessage = session.createTextMessage("這是test的第" + i + "條message-----");
//8.通過Producer發送給queue
producer.send(textMessage);
}
//9.關閉資源
producer.close();
session.close();
connection.close();
System.out.println("消息釋出到mq完成");
消息消費者編碼(Queue,同步阻塞和監聽器兩種消費方式)
//1.建立連接配接工程
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.通過連接配接工廠,獲得連接配接并啟動通路
Connection connection = connectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數,第一個叫事務,第二個叫簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地Destination(是隊列queue還是主題topic)
Queue queue = session.createQueue("activemq-queue");
//5.建立消費者
MessageConsumer consumer = session.createConsumer(queue);
/*
//同步阻塞方式receive()
//訂閱者或接受者調用consumer的receive方法來接收消息,receive方法再能夠接收到消息(或逾時之前)之前将一直阻塞
while (true){
TextMessage textMessage = (TextMessage)consumer.receive();
if (textMessage!=null){
System.out.println(textMessage.getText());
}else {
break;
}
}
//關閉資源
consumer.close();
session.close();
connection.close();
*/
//通過監聽器的方式來消費消費
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message!=null && message instanceof TextMessage){
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();//讓資源不關閉,否則還沒等到監聽,連接配接就斷開了
//關閉資源
consumer.close();
session.close();
connection.close();
釋出訂閱模式(publish-and-subsrcibe)
- 生産者将消息釋出到topic中,每個消息可以有很多個消費者,屬于1:N的關系。
- 生産者和消費者之有時間上的相關性。訂閱某一個主題的消費者隻能消費自訂閱後之後釋出的消息。
- 生産者生産時,topic 不儲存消息,它是無狀态的,假如無人訂閱就去生産,那就是一條廢消息,是以,一般先啟動消費者再啟動生産者。
- JMS規範允許客戶建立持久訂閱,這在一定程度上放松了時間上的相關性要求。**持久訂閱允許消費者消費它在未處于激活狀态時發送的消息。**就好比我們的微信公衆号的訂閱。
釋出消息編碼(Topic)
//1.建立連接配接工程
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.通過連接配接工廠,獲得連接配接并啟動通路
Connection connection = connectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數,第一個叫事務,第二個叫簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地Destination(是隊列queue還是主題topic)
Topic topic = session.createTopic("activemq-topic");
//5.建立生産者
MessageProducer producer = session.createProducer(topic);
for (int i=0;i<3;i++){
TextMessage textMessage = session.createTextMessage("這是第" + i + "條topic測試");
producer.send(textMessage);
}
//關閉資源
producer.close();
session.close();
connection.close();
System.out.println("消息釋出到mq完成");
消費消息編碼(Topic)
System.out.println("我是一号消費者");
//1.建立連接配接工程
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.通過連接配接工廠,獲得連接配接并啟動通路
Connection connection = connectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數,第一個叫事務,第二個叫簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地Destination(是隊列queue還是主題topic)
Topic topic = session.createTopic("activemq-topic");
//5.建立消費者
MessageConsumer consumer = session.createConsumer(topic);
//通過監聽器的方式來消費消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message!=null && message instanceof TextMessage){
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();//讓資源不關閉,否則還沒等到監聽,連接配接就斷開了
//關閉資源
consumer.close();
session.close();
connection.close();