項目中用到rocket mq的方式有多種,
第一種,嚴格按照時間消費的模式,這種模式需要用串行方式,生産者生産的時候,這時候生産者需要往特定的隊列裡有序push:
SendResult result = producer.send(msg, new MessageQueueSelector(){
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = arg.hashCode();
int index = id % mqs.size();
return mqs.get(index);
}
}, dataAsyncEvent.getDataType());
消費者也要按照順序嚴格有序消費,用這個有序的監聽者:
//同一隊列的消息同一時刻隻能一個線程消費,可保證消息在同一隊列嚴格有序消費
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
return ConsumeOrderlyStatus.CONSUME_SUCCESS;
}
public interface MessageListenerOrderly extends MessageListener {
public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeOrderlyContext context);
}
第二種:不追求時間順序,隻要把生産出來的事件全部消費完就可以。這種可以用并行的方式處理,效率高很多:
生産者:
SendResult result = producer.send(msg);
消費者:(用此接口處理)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
public interface MessageListenerConcurrently extends MessageListener {
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context);
}