學習中的一些心得記錄
p2p方式:
概念:queue,QueueSender,QueueReceiver,隊列,消息發送者,消息接收者三個角色。發送者向隊列中加入消息,接收者從隊列取出消息,隊列中的消息接收一條少一條,如果有多個接收者連接配接到隊列,一條消息隻能被一個接收者取走。
一、 消息發送方式預設不設定的時候是 DeliveryMode.PERSISTENT 方式,也就是持久化方式。該常量值為2。這種方式消息不會丢失,會儲存在伺服器上。就是說消息發送後,即使你關閉jms伺服器(比如用activeMQ, 關閉伺服器程式),然後服務重新啟動後,這條消息仍然是存在的,而其他使用NON_PERSISTENT方式的消息就丢失了。我想大多數情況下啊我們需要 的是PERSISTENT方式吧!
DeliveryMode.NON_PERSISTENT 常量值為1
DeliveryMode.PERSISTENT 常量值為2
int a = msgp0.getDeliveryMode();
//msgp0.setDeliveryMode(DeliveryMode.PERSISTENT);
二、消息的消費者接收消息可以采用兩種方式:
1、consumer.receive() 或 consumer.receive(int timeout);
2、注冊一個MessageListener。
采用第一種方式,即同步方式,遇到receive方法就會阻塞,消息的接收者會一直等待下去,直到有消息到達,或者逾時。該方式調用一次方法隻接收一條消息。
采用第二種方式,即異步方式,程式會一直監聽,隻要有消息就會響應onMessage方法,當執行 conn.close()方法時會結束監聽器。
MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener( new MessageListener(){
@Override
public void onMessage(Message m) {
TextMessage textMsg = (TextMessage) m;
try {
System.out.println(textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
測試發現 conn.close() 方法對這兩種方式有些不同,對于監聽器方式,隻要close方法執行,監聽就會終止,估計監聽是以背景線程方式執行的。而對于recive接收方式,隻要 receive方法隻要調用一次,就必須等到有消息到達才會執行下面的語句,是阻塞的方式,即使後面的conn.close()不會執行到。
參考:http://www.cnblogs.com/iloveu/archive/2009/06/10/1500714.html
pub/sub方式 :
概念:topic,publisher,subscriber,主題,釋出者,訂閱者三個角色。主題和訂閱者是一對多關系,一個主題可以被多個訂閱者訂閱。當釋出者向某個主題發送一條消息時,所有的訂閱者都會收到。
如何了解訂閱的概念呢,個人了解,分兩種情況:
一、 建立一個訂閱者時使用session.createDurableSubscriber(topic, clientId )方法建立為持久型訂閱者時,該topic和clientId的訂閱關系将被儲存在伺服器上,即産生了訂閱關系。這樣clientId這個id的訂閱者将 在離線的時候,也不會丢失消息。這個訂閱關系也可以了解為被持久化在jms伺服器上了,使用jmx的監視控制台(我使用的activeMq),可以看到有 一個Subscription的節點,下面有你訂閱主題時給定的用戶端名字。可以使用unsubscribe 方法 取消訂閱關系。
二、 建立一個非持久化方式的訂閱者時,隻有在用戶端訂閱者連接配接到jms伺服器時,訂閱關系成立,訂閱者離線時,訂閱關系即取消,不會儲存在伺服器上,這也是非 持久化方式訂閱者不能離線接收消息的原因吧。預設為廣播方式,在沒有訂閱者連接配接到伺服器時,發送的消息将丢失,不會儲存在伺服器。
對jms規範的了解:
jms 規範是sun提出的java的消息服務統一的api接口規範,規範、标準的好處就不用說了,java能走到今天,規範标準的制定可以說有很大的功勞吧。所 有提供java消息服務的軟體提供者都必須針對這個接口實作。我們編寫jms程式時,基本可以做到隻針對接口程式設計,不用考慮是那個廠家實作,這是我們的代 碼有了很大的靈活行,不綁定某個具體廠商。隻要你的代碼寫的夠好,完全可以不修改代碼就直接部署到不同廠商的jms伺服器上。
舉個例子:編寫一個基于ActiveMQ的jms程式時有這樣的代碼:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost" );
...
Queue queue = new ActiveMQQueue("testQueue" );
等,這樣就在代碼裡綁死了ActiveMQ的api。要做到廠商無關性,應該使用jndi來取得工廠對象以及隊列資源。
比如:
ConnectionFactory cf = (ConnectionFactory) ctx.lookup("connectionFactory");
...
Destination dest = (Queue) ctx.lookup("MyQueue");
注意接口也盡量使用jms的大的接口。當然這裡需要提供jndi服務支援,ActiveMQ本身提供jndi服務,隻要在classpath中有一個jndi.properties檔案,在配置檔案中對資源做一些配置就可以了
比如:
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
queue.MyQueue = zmQueue
topic.MyTopic = zmTopic
等。
對jms規範1.1的一些了解:
JMS 1.0.2 定義了兩種類型的消息傳遞域(它們是互相獨立的),即點對點和釋出/訂閱。JMS 的最新版本,即版本 1.1,JMS 統一了這兩個域。如下圖:有了 JMS 1.1,客戶機就不再必須專門針對這個或那個域進行實作了。
JMS 公共 | PTP 域 | Pub/Sub 域 |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
參考:http://www.ibm.com/developerworks/cn/java/j-jms11/index.html
就是說,隻使用JMS公共域的接口就可以完成jms程式編寫。是以推薦盡量使用大的接口,增加代碼的靈活性。
比如,寫pub/sub方式的消息程式也可以這樣:
Context ctx = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) ctx
.lookup("connectionFactory");
Connection connection = factory.createConnection();
connection.start();
// 建立一個Topic
Destination topic = (Destination) ctx.lookup("MyTopic");
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer comsumer3 = session.createConsumer(topic);
Message msg = comsumer3.receive();