前幾章都是直接發送MapMessage類型的資料,拿前面的例子來講,如果生産者發送的是TextMessage,消費者也是必須TextMessage;如果我們自己要發送的資料不是TextMessage類型,而消費者還是TextMessage的,那該怎麼辦?難道每次接受後都要增加一個轉換方法麼?其實spring早就考慮到這種情況了。轉化器在很多元件中都是必不缺少的東西Spring的MessageConverter接口提供了對消息轉換的支援。
1、轉換類的相關代碼POJO
建立一個類MsgPoJo,就是一個簡單的Pojo類。具體代碼如下:
package jms.mq.spring;
import java.io.Serializable;
publicclass MsgPoJo implements Serializable{
private String id;
private String text;
public String getId() {
return id;
}
publicvoid setId(String id) {
this.id = id;
public String getText() {
return text;
publicvoid setText(String text) {
this.text = text;
}
}
2.轉換類的實作
建立一個類MsgConverter.java,實作MessageConverter接口。生成的代碼如下
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
publicclass MsgConverter implements MessageConverter{
@Override
public Object fromMessage(Message message) throws JMSException,
MessageConversionException {
if (!(message instanceof TextMessage)) {
thrownew MessageConversionException("Message is not TextMessage");
}
System.out.println("--轉換接收的消息--");
TextMessage textMessage = (TextMessage) message;
MsgPoJo msgPojo = new MsgPoJo();
String[] texts=textMessage.getText().split(",");
msgPojo.setId(texts[0]);
msgPojo.setText(texts[1]);
return msgPojo;
public Message toMessage(Object object, Session session) throws JMSException,
if (!(object instanceof MsgPoJo)) {
thrownew MessageConversionException("obj is not MsgPojo");
System.out.println("--轉換發送的消息--");
MsgPoJo msgPojo = (MsgPoJo) object;
TextMessage textMessage = session.createTextMessage();
textMessage.setText(msgPojo.getId()+","+msgPojo.getText());
return textMessage;
代碼很簡單就是做些轉換,有fromMessage和toMessage兩個方法,真好對應發送轉換toMessage和接受轉換fromMessage。此時,發送和接收消息要換成template.convertAndSend(message);template.receiveAndConvert()。接下來我做一些配置,讓spring知道我們的轉換類。修改applicationContext.xml中jms模版配置的代碼,修改後的代碼如下:
<!-- 類轉換 -->
<beanid="msgConverter"class="jms.mq.spring.MsgConverter"></bean>
<!-- 配置Jms模闆 -->
<beanid="jmsQueueTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory"ref="connectionFactory"/>
<propertyname="defaultDestination"ref="queueDest"/>
<!--<property name="receiveTimeout" value="10000" /> -->
<propertyname="messageConverter"ref="msgConverter"></property>
</bean>
注意:如果你有隊列監聽容器配置,配置jmsQueueTemplate和jmsTopicTemplate可能與隊列容器配置沖突。
3、業務相關代碼和配置
在QueueProducerService.java增加convertAndSend()方法并在其實作類中實作,實作類的代碼如下:
import java.util.Date;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
publicclass QueueProducerService{
JmsTemplate jmsTemplate;
Destination destination;
publicvoid send() {
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
message.setText("QueueProducerService發送消息"+new Date());
return message;
}
};
jmsTemplate.send(this.destination,messageCreator);
publicvoid convertAndSend(){
msgPojo.setId("1");
msgPojo.setText("first msg");
System.out.println("--發送消息:msgPojo.id為"+msgPojo.getId()+";msgPojo.text為"+msgPojo.getText());
jmsTemplate.convertAndSend(this.destination, msgPojo);
publicvoid setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
publicvoid setDestination(Destination destination) {
this.destination = destination;
同樣在QueueConsumerService.java中增加receiveAndConvert()方法并在其實作類中實作,實作類的代碼如下:
publicclass QueueConsumerService{
publicvoid receive() {
TextMessage message = (TextMessage) jmsTemplate.receive();
try {
System.out.println("QueueConsumerService收到消息:"+message.getText());
} catch (JMSException e) {
e.printStackTrace();
publicvoid receiveAndConvert() {
MsgPoJo msgPojo = (MsgPoJo)jmsTemplate.receiveAndConvert();
if(msgPojo!=null){
System.out.println("--收到消息:msgPojo.id為"+msgPojo.getId()+";msgPojo.text為"+msgPojo.getText());
修改我們的兩個測試類,增加對轉換方法的調用,不再贅述,直接上代碼:
QueueConsumerTest.java測試類
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
publicclass QueueConsumerTest {
privatestatic ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");
privatestaticvoid receive() {
QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");
consumerService.receive();
privatestaticvoid receiveAndConvert() {
consumerService.receiveAndConvert();
publicstaticvoid main(String[] args) {
//receive();
receiveAndConvert();
QueueProducerTest.java測試類
publicclass QueueProducerTest {
privatestaticvoid send() {
QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");
producerService.send();
privatestaticvoid convertAndSend() {
producerService.convertAndSend();
//send();
convertAndSend();
代碼編寫完畢,我們看一下我們的勞動成果。首先運作生産者類和消費者控制台資訊如下:
收到的内容與發的内容相同,說明轉換成功了。如果這一部分的程式使用的隊列跟上面的一樣,那你會發現發送的時候列印出的資訊不值上面的一個,還包括一個接收的資訊,這是為什麼呢?了解spring原理的人應該知道,spring是把所有類都加載到内容中,當然也包括我們上門寫的按個實作MessageListener的一個消費者類,他們也在運作,如果監聽的位址跟你送的位址正好相同的話,他也有可能收到這個資訊。是以在測試的時候要注意修改配置檔案。
<beanid="queueProducerService"class="jms.mq.spring.QueueProducerService">
<propertyname="jmsTemplate"ref="jmsQueueTemplate"/>
<propertyname="destination"ref="queueDest"/>
<beanid="queueConsumerService"class="jms.mq.spring.QueueConsumerService">
4、監聽器上的使用方式
我再來學習一下跟監聽器聯合使用的方式,隻在釋出訂閱者模式上示範一下。我們先來修改釋出者的實作方式,在釋出者中增加convertAndSend方法并在其實作類中實作,訂閱者監聽器沒有類轉換,不用修改,釋出者修改後的代碼如下:
import javax.jms.MapMessage;
import jms.spring.QueueProducerService;
publicclass TopicPublisherService{
publicvoid convertAndSend(Object obj) {
System.out.println("--發送PoJo對象...");
jmsTemplate.convertAndSend(destination, obj);
釋出訂閱者配置檔案如下
<!-- 配置TopicJms模闆 -->
<beanid="jmsTopicTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="defaultDestination"ref="topicDest"/>
<!-- 配置是否為釋出訂閱者模式,預設為false -->
<propertyname="pubSubDomain"value="true"/>
<beanid="topicPublisherService"class="jms.mq.spring.TopicPublisherService">
<propertyname="jmsTemplate"ref="jmsTopicTemplate"/>
<!-- <property name="destination" ref="topicDest" /> -->
<propertyname="destination"ref="topicSubscriberMessageListenerDest"/>
<beanid="topicSubscriberService"class="jms.mq.spring.TopicSubscriberService">
<propertyname="destination"ref="topicDest"/>
修改上面的釋出測試類,修改增加對新增方法的調用,修改後的内容如下:
publicclass TopicPublisherTest {
TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");
topicPublisherService.send();
MsgPoJo msgPoJo = new MsgPoJo();
msgPoJo.setId("1");
msgPoJo.setText("測試内容");
topicPublisherService.convertAndSend(msgPoJo);
運作釋出測試類,運作結果如下:
寫在到這裡,ActiveMQ與spring整合就講完了,主要講了ActiveMQ與spring的簡單整合,監聽器和類轉換這些主要功能.
本文轉自yunlielai51CTO部落格,原文連結:http://blog.51cto.com/4925054/1288918,如需轉載請自行聯系原作者