今天用了阿裡開源的RocketMQ,第一次消費,使用新的consumserGroup消費,設定
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
但是結果還是從offset為0 開始消費。網上也有很多人遇到了這個問題,但是并沒有答案。是以我檢視了一下源碼,在用戶端代碼(rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java)中:
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere =
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
}
else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
}
catch (MQClientException e) {
result = -1;
}
}
}
else {
result = -1;
}
break;
}
Debug了一下代碼,從這個取到的資料傳回來的lastOffset為0
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
實際上使用了新的consumerGroup,這裡擷取到的結果應該是-1;這個是從server端擷取的,是以是Broker傳回的資訊有誤。
readOffset的代碼如下 :
case READ_FROM_STORE: {
try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
// TODO Here may be heavily overhead for Name Server,need tuning
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
public long queryConsumerOffset(//
final String addr,//
final QueryConsumerOffsetRequestHeader requestHeader,//
final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
if (!UtilAll.isBlank(projectGroupPrefix)) {
requestHeader.setConsumerGroup(VirtualEnvUtil.buildWithProjectGroup(
requestHeader.getConsumerGroup(), projectGroupPrefix));
requestHeader.setTopic(VirtualEnvUtil.buildWithProjectGroup(requestHeader.getTopic(),
projectGroupPrefix));
}
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response
.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
從代碼中可以看到,用戶端是采用RequestCode.QUERY_CONSUMER_OFFSET到server端擷取offset相關資訊。
通過QUERY_CONSUMER_OFFSET到Broker的代碼上進行搜尋,發現了如下這段(rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java):
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, clientProcessor,
this.clientManageExecutor);
clientProcessor的處理邏輯如下(com.alibaba.rocketmq.broker.processor.ClientManageProcessor):
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
// 訂閱組存在
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
}
// 訂閱組不存在
else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
requestHeader.getQueueId());
// 訂閱組不存在情況下,如果這個隊列的消息最小Offset是0,則表示這個Topic上線時間不長,伺服器堆積的資料也不多,那麼這個訂閱組就從0開始消費。
// 尤其對于Topic隊列數動态擴容時,必須要從0開始消費。
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
}
// 新版本伺服器不做消費進度糾正
else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}
return response;
}
發現有如下這段注釋:
// 訂閱組不存在情況下,如果這個隊列的消息最小Offset是0,則表示這個Topic上線時間不長,伺服器堆積的資料也不多,那麼這個訂閱組就從0開始消費。
// 尤其對于Topic隊列數動态擴容時,必須要從0開始消費。
這樣原因就清楚了,因為offset是記錄在對應的broker下的,為了避免動态擴容時,consumerGroup在新的Broker中暫時還沒有,為了避免資料丢失,要從0開始消費。
為了避免這個問題,導緻參數無效,在topic資料量都很大的情況下,這個問題不明顯,而當經常需要更新consumerGroup,而資料量較小的系統會出現問題了。
記錄一下這個坑。