1. 前言
異步發送消息其實很多地方與同步發送一樣,不過有一點是你在程式設計的時候需要提供SendCallback 對象,用來發送響應來的時候進行回調使用,我們知道同步發送是等待broker響應到來,然後将響應往上傳回,這個異步調用就是響應來的時候,對你提供的回調對象進行調用,你這個回調對象可以寫一些自己的邏輯等等。
2. 簡單使用
在源碼解析之前我們要先看一下異步發送消息是怎樣程式設計的: org.apache.rocketmq.example.simple.AsyncProducer:
public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
複制代碼
這裡需要我們提供一個SendCallback 對象用來響應來的時候回調,其中異常的時候會調用這裡面的onException方法, 成功的時候調用onSuccess方法,執行相應的邏輯。
3. 源碼解析
好了,現在開始源碼解析:
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback);
}
複制代碼
首先是調用了defaultMQProducerImpl 的send方法,并且将msg 與callback傳進去:
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
複制代碼
這裡又調用了下它的重載方法,然後将發送逾時時間傳進去,預設的一個發送逾時時間是3s。
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
// 異步線程池
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
// todo
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
}
} else {
sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
});
}
...
}
複制代碼
這裡需要注意的是,它将這個發送消息的任務交給了一個異步發送線程池,然後在任務中是調用了sendDefaultImpl 方法,然後通信方式是異步CommunicationMode.ASYNC,這裡我們需要看下這個線程池的一些參數,因為關乎我們以後的調優:
public ExecutorService getAsyncSenderExecutor() {
return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor;
}
複制代碼
這裡先是判斷asyncSenderExecutor 這個線程池是不是null,其實咱們這裡它是null的(因為我們沒有指定線程池,不過你程式設計的時候是可以指定的,使用setAsyncSenderExecutor()這個方法就可以設定了),就使用defaultAsyncSenderExecutor線程池,這個defaultAsyncSenderExecutor 線程池是在defaultMQProducerImpl 類構造方法建立的的,我們可以看下:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 5w大小的連結清單隊列,異步發送線程池隊列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 預設的異步發送線程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),// 核心線程池數,cpu核數
Runtime.getRuntime().availableProcessors(), // 最大線程池數,cpu核數
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
複制代碼
隊列是5w大小,然後核心線程數 是cpu的核心數,maxThreads也是cpu核心數。
好了,我們繼續往下看,這個sendDefaultImpl 方法其實就是選擇MessageQueue然後重試那一套東西,不過,異步發送雖然走這個方法,但是它的失敗重試不是這樣子玩的,我們接着往下看接着又調用了sendKernelImpl 方法:(DefaultMQProducerImpl#sendKernelImpl)
// 同步 異步 單向
switch (communicationMode) {
// 異步
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
// 判斷逾時時間
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// todo
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
...
複制代碼
這裡我們主要是看下異步處理這一塊,因為我們在介紹同步發送與單向發送都有介紹過這個方法,其實這個方法就是封裝請求頭啥的,異步這一塊我們可以看到,先是判斷了一下逾時沒有,然後調用MQClientAPI的sendMessage方法,注意下它這個倒數第三個參數,這個參數是擷取了一下預設的重試次數,預設是2。
接下來MQClientAPIImpl#sendMessage:
switch (communicationMode) {
...
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
// 判斷逾時時間
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
// todo
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
...
複制代碼
這個方法我們也是主要看下這個異步發送這塊,首先是定義了一個times,這個times記錄的發送次數,然後判斷了是否逾時,然後調用sendMessageAsync這個異步發送方法。這個方法又調用了RemotingClient的invokeAsync方法,其中在sendMessageAsync 方法中建立了一個InvokeCallback 對象,我們先不管這個InvokeCallback ,後面再解釋,先看下invokeAsync 方法:
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
// 擷取channel
final Channel channel = this.getAndCreateChannel(addr);
// channel不為null 并且channel是激活
if (channel != null && channel.isActive()) {
try {
// 調用之前的鈎子
doBeforeRpcHooks(addr, request);
// 判斷逾時時間
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
}
// todo 調用
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
// 關閉channel
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
複制代碼
其實套路都一樣,先是根據broker addr 擷取對應的channel,然後判斷一下channel狀态,然後執行調用前的鈎子,判斷有沒有逾時,調用invokeAsyncImpl方法進行發送。
這個方法重要的操作就這幾個,首先生成一個調用id ,也就是opaque , 接着擷取信号量許可,這個信号量使用來限流的,預設是65535,擷取之後判斷一下有沒有逾時,然後封裝ResponseFuture 對象,将ResponseFuture 對象緩存到response表中。
接着将消息寫到channel中,注意有個listener 是在發送出去的時候執行,成功的話将ResponseFuture 對象設定發送成功,失敗的走了requestFail(opaque)方法,失敗我們先不看。這個時候就送成功了,等到收到broker響應的時候,NettyClientHandler 就能收到消息了
3.2 正常接收響應
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
/**
* 處理接收到的遠端
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
// todo
processMessageReceived(ctx, msg);
}
}
複制代碼
這個是netty的知識點,是将NettyClientHandler 對象注冊到netty的pipeline上面,在發送内容,接收内容,都會執行響應的實作方法。
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
// 請求消息
case REQUEST_COMMAND:
// todo
processRequestCommand(ctx, cmd);
break;
// 響應消息
case RESPONSE_COMMAND:
// todo
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
複制代碼
我們這裡是收到的響應消息,然後調用processResponseCommand 處理:
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
// 擷取對應id 的responseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
// 設定
responseFuture.setResponseCommand(cmd);
// 從響應表中移除
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
// todo 執行回調
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
複制代碼
這裡就是根據opaque去responseTable這個緩存中找到對應的ResponseFuture 對象,然後設定響應内容,最最最重要的點就是看一下它的invokeCallBack有沒有,我們發送消息的時候是有設定進去的。它會調用executeInvokeCallback 方法執行:
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
// 擷取回調線程池
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
// todo 執行回調
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
// 釋放 信号量
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
// 如果需要在該線程池中執行
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
複制代碼
這裡就是擷取執行回調的線程池,如果線程池是null的話,就在目前線程執行。這個回調線程池參數我們也看下
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
// 設定 異步與單向請求的信号量
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
// netty配置檔案
this.nettyClientConfig = nettyClientConfig;
// listener
this.channelEventListener = channelEventListener;
// 這個是netty 用戶端回調線程數,預設是cpu核數
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
// 如果小于等于0 預設為4
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 設定線程池
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
...
複制代碼
它是拿的這個線程是,預設核心線程數也是cpu核心數。
接着就是調用了ResponseFuture 的executeInvokeCallback 方法:
public void executeInvokeCallback() {
if (invokeCallback != null) {
// 設定回調狀态
if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
// todo 執行回調
invokeCallback.operationComplete(this);
}
}
}
複制代碼
設定回調狀态,然後調用invokeCallback的operationComplete 方法,現在我們再回到MQClientAPI的sendMessageAsync 方法中,因為當時是在這個方法中建立的這個 invokeCallback 對象:
private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
// todo
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
// 回調的方法
@Override
public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}
// 更新容錯資訊
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}
if (response != null) {
try {
// todo 生成發送結果
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
// todo 調用回調 的成功方法
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}
// 更新容錯資訊
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
// 跟新容錯資訊 isolation:true 隔離
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
// 異常
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
// 跟新容錯資訊 isolation:true 隔離
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
// 發送未成功
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
// todo 異步模式 重試邏輯
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
// 逾時
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
});
}
複制代碼
這個方法分為2部分吧,一是有響應,然後沒有sendCallback ,這個sendCallback 是你自己寫的那個回調對象,這個時候沒有的話說明你不準備回調了,然後解析了一下結果,執行了一下 調用後的鈎子,這部分就算完事了,二是有響應,然後也是有這個回調對象sendCallback的,先是解析了下響應,然後執行了你寫的那個sendCallback 對象,另外就是執行了updateFaultItem ,進行更新一個響應資訊,見
如果異常的話,執行了一個onExceptionImpl 方法來處理,我們來看下這個方法的實作:
private void onExceptionImpl(final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int timesTotal,
final AtomicInteger curTimes,
final Exception e,
final SendMessageContext context,
final boolean needRetry,
final DefaultMQProducerImpl producer
) {
// 增加次數
int tmp = curTimes.incrementAndGet();
// 需要重試 && 重試的次數 小于 允許的重試次數
if (needRetry && tmp <= timesTotal) {
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
// 重新選取一個MessageQueue
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
retryBrokerName = mqChosen.getBrokerName();
}
// 重新擷取 broker位址
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
retryBrokerName);
try {
// 生成一個新的請求id
request.setOpaque(RemotingCommand.createNewRequestId());
// todo 重試
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingConnectException e1) {
// 設定容錯隔離
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
} catch (RemotingTooMuchRequestException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingException e1) {
// 逾時 設定容錯隔離
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
}
} else {
// 不能重試
if (context != null) {
context.setException(e);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
// 執行回調
sendCallback.onException(e);
} catch (Exception ignored) {
}
}
}
複制代碼
增加調用次數,然後判斷是否需要重試&& 重試次數在範圍内,然後就是重新選擇一個MessageQueue,重新設定請求id,也就是opaque這個,最後就是調用 sendMessageAsync 進行發送了,這就是異步調用的一個重試邏輯,并沒有使用for循環的形式。
3.3 逾時接收響應
在調用writeAndFlush(...)方法前,會先this.responseTable.put(opaque, responseFuture)方法,将responseFuture添加到responseTable中,這是個Map結構,rocketMq正是定時從responseTable中擷取responseFuture并判斷其狀态來決定調用SendCallback的哪個方法的。
讓我們回到NettyRemotingClient的啟動流程,方法為NettyRemotingClient#start:
public void start() {
...
// 掃描消息擷取結果,每秒執行1次
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
...
}
複制代碼
在這個方法中,啟動了一個定時任務,每秒執行1次,所做的工作就是掃描在responseTable中的responseFuture,我們再進入NettyRemotingAbstract#scanResponseTable方法:
public void scanResponseTable() {
//本次要處理的傳回
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
// 判斷時間,時間到了才轉移到 rfList 中
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000)
<= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
// 處理傳回
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
複制代碼
在這個方法裡,先周遊所有的ResponseFuture,然後判斷每個ResponseFuture的時間,時間到了才會進行處理,從這裡可以看出,并不是一有結果就立即處理,而是在消息發送後過了4秒(rep.getTimeoutMillis()的值為3)才去處理結果,處理方法為NettyRemotingAbstract#executeInvokeCallback,這個方法處理 跟 3.2 小節 是一樣的,不在贅述
好了,到這我們的異步發送解析就已經ok了.