天天看點

alibaba rocket mq的串行及并行處理

項目中用到rocket mq的方式有多種,

第一種,嚴格按照時間消費的模式,這種模式需要用串行方式,生産者生産的時候,這時候生産者需要往特定的隊列裡有序push:

                     SendResult result = producer.send(msg, new MessageQueueSelector(){

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

Integer id = arg.hashCode();

int index = id % mqs.size();

return mqs.get(index);

}

}, dataAsyncEvent.getDataType());

                   消費者也要按照順序嚴格有序消費,用這個有序的監聽者:

                   //同一隊列的消息同一時刻隻能一個線程消費,可保證消息在同一隊列嚴格有序消費

 consumer.registerMessageListener(new MessageListenerOrderly() {

@Override

            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,

                    ConsumeOrderlyContext context) {

                    return ConsumeOrderlyStatus.CONSUME_SUCCESS;

             }

                   public interface MessageListenerOrderly extends MessageListener {

    public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,

            final ConsumeOrderlyContext context);

第二種:不追求時間順序,隻要把生産出來的事件全部消費完就可以。這種可以用并行的方式處理,效率高很多:

               生産者:

               SendResult result = producer.send(msg);

                消費者:(用此接口處理)

               consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

                    ConsumeConcurrentlyContext context) {

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

             }

                public interface MessageListenerConcurrently extends MessageListener {

    public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,

            final ConsumeConcurrentlyContext context);

}

繼續閱讀