開篇
這篇文章的目的主要是理清楚Fescar的TM發送部分的邏輯,從時序圖和源碼兩個層面進行分析。
文章中間會解答兩個自己閱讀代碼中遇到的困惑(估計大部分人看代碼的時候也會遇到這個困惑),包括TmRpcClient的初始化過程和配置加載過程。
文章的最後會附上GlobalAction相關Request的類關系圖,便于了解依賴關系。
Fescar TM發送流程
說明:
- 1.DefaultGlobalTransaction執行begin/commit/rollback等調用DefaultTransactionManager。
- 2.DefaultTransactionManager内部調用syncCall()方法,進而調用TmRpcClient的sendMsgWithResponse()方法。
- 3.TmRpcClient調用父類AbstractRpcRemoting的sendAsyncRequest()方法建構發送隊列。
- 4.AbstractRpcRemotingClient的MergedSendRunnable線程消費發送隊列建構MergedWarpMessage調用sendRequest發送。
- 5.sendRequest()方法内部調用writeAndFlush完成消息發送。
- TmRpcClient的類依賴關系圖如上。
- TmRpcClient繼承自AbstractRpcRemotingClient類。
Fescar TM發送源碼分析
public class DefaultTransactionManager implements TransactionManager {
private static class SingletonHolder {
private static final TransactionManager INSTANCE = new DefaultTransactionManager();
}
/**
* Get transaction manager.
*
* @return the transaction manager
*/
public static TransactionManager get() {
return SingletonHolder.INSTANCE;
}
private DefaultTransactionManager() {
}
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setTransactionId(txId);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setTransactionId(txId);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setTransactionId(txId);
GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, toe);
}
}
}
- DefaultTransactionManager的beigin/commit/rollback方法内部最終調用syncCall()方法。
- syncCall方法内部執行TmRpcClient.getInstance().sendMsgWithResponse(request)調用TmRpcClient方法。
public final class TmRpcClient extends AbstractRpcRemotingClient {
@Override
public Object sendMsgWithResponse(Object msg) throws TimeoutException {
return sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout());
}
@Override
public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout)
throws TimeoutException {
return sendAsyncRequestWithResponse(serverAddress, connect(serverAddress), msg, timeout);
}
}
- TmRpcClient内部執行發送sendMsgWithResponse調用sendAsyncRequestWithResponse。
- sendAsyncRequestWithResponse的實作在父類AbstractRpcRemoting當中。
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
TimeoutException {
if (timeout <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
return sendAsyncRequest(address, channel, msg, timeout);
}
private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
throws TimeoutException {
if (channel == null) {
LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
return null;
}
// 建構RpcMessage對象
final RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setId(RpcMessage.getNextMessageId());
rpcMessage.setAsync(false);
rpcMessage.setHeartbeat(false);
rpcMessage.setRequest(true);
rpcMessage.setBody(msg);
// 通過MessageFuture包裝實作逾時
final MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeout);
futures.put(rpcMessage.getId(), messageFuture);
// 測試代碼走的是這個分支
if (address != null) {
// 根據address進行hash放置到不同的Map當中
ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
BlockingQueue<RpcMessage> basket = map.get(address);
if (basket == null) {
map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
basket = map.get(address);
}
basket.offer(rpcMessage);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: " + rpcMessage.getBody());
}
// 發送其實是另外一個線程單獨執行發送操作的
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
} else {
ChannelFuture future;
channelWriteableCheck(channel, msg);
future = channel.writeAndFlush(rpcMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
}
});
}
// 通過Future實作限時逾時機制
if (timeout > 0) {
try {
return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
if (exx instanceof TimeoutException) {
throw (TimeoutException)exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
return null;
}
}
}
- 建構RpcMessage對象,包裝Request。
- 建構MessageFuture對象,包裝RpcMessage,實作逾時等待功能。
- 通過basket進行分桶操作,真正執行發送的代碼在AbstractRpcRemotingClient類的MergedSendRunnable。
- Request的發送類似生成消費者模型,上述代碼隻是生産者部分。
public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
implements RemotingService, RegisterMsgListener, ClientMessageSender {
public class MergedSendRunnable implements Runnable {
@Override
public void run() {
while (true) {
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {}
}
isSending = true;
for (String address : basketMap.keySet()) {
BlockingQueue<RpcMessage> basket = basketMap.get(address);
if (basket.isEmpty()) { continue; }
MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage)msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = connect(address);
try {
sendRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable
&& address != null) {
destroyChannel(address, sendChannel);
}
LOGGER.error("", "client merge call failed", e);
}
}
isSending = false;
}
}
}
- MergedSendRunnable 負責消費待發送消息體并組裝成MergedWarpMessage對象。
- sendRequest()方法内部将MergedWarpMessage再次包裝成RpcMessage進行發送。
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
protected void sendRequest(Channel channel, Object msg) {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setAsync(true);
rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
rpcMessage.setRequest(true);
rpcMessage.setBody(msg);
rpcMessage.setId(RpcMessage.getNextMessageId());
if (msg instanceof MergeMessage) {
mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
}
channelWriteableCheck(channel, msg);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
+ channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
}
channel.writeAndFlush(rpcMessage);
}
}
- RpcMessage再次包裝MergeMessage進行發送。
TmRpcClient初始化
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean {
public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode,
FailureHandler failureHandlerHook) {
setOrder(ORDER_NUM);
setProxyTargetClass(true);
this.applicationId = applicationId;
this.txServiceGroup = txServiceGroup;
this.mode = mode;
this.failureHandlerHook = failureHandlerHook;
}
private void initClient() {
TMClient.init(applicationId, txServiceGroup);
if ((AT_MODE & mode) > 0) {
RMClientAT.init(applicationId, txServiceGroup);
}
}
public void afterPropertiesSet() {
initClient();
}
}
- GlobalTransactionScanner的構造函數執行後執行afterPropertiesSet并執行initClient()操作。
- initClient()内部執行TMClient.init(applicationId, txServiceGroup)進行TMClient的初始化。
public class TMClient {
public static void init(String applicationId, String transactionServiceGroup) {
TmRpcClient tmRpcClient = TmRpcClient.getInstance(
applicationId, transactionServiceGroup);
tmRpcClient.init();
}
}
public final class TmRpcClient extends AbstractRpcRemotingClient {
public void init() {
if (initialized.compareAndSet(false, true)) {
init(SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS);
}
}
public void init(long healthCheckDelay, long healthCheckPeriod) {
// 注意initVars()方法
initVars();
ExecutorService mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX),
MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
reconnect();
} catch (Exception ignore) {
LOGGER.error(ignore.getMessage());
}
}
}, healthCheckDelay, healthCheckPeriod, TimeUnit.SECONDS);
}
private void initVars() {
enableDegrade = CONFIG.getBoolean(
ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX);
super.init();
}
}
- 核心在于關注initVars()方法。
public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
implements RemotingService, RegisterMsgListener, ClientMessageSender {
public void init() {
NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
// 核心建構發送的對象的連接配接池
nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory);
nettyClientKeyPool.setConfig(getNettyPoolConfig());
serviceManager = new ServiceManagerStaticConfigImpl();
super.init();
}
}
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());
for (MessageFuture future : futures.values()) {
if (future.isTimeout()) {
timeoutMessageFutures.add(future);
}
}
for (MessageFuture messageFuture : timeoutMessageFutures) {
futures.remove(messageFuture.getRequestMessage().getId());
messageFuture.setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}
}
- AbstractRpcRemotingClient的init()方法核心建構nettyClientKeyPool工廠。
- nettyClientKeyPool用于擷取連接配接TC的對象的工廠池。
配置加載分析
public class FileConfiguration implements Configuration {
private static final Logger LOGGER = LoggerFactory.getLogger(FileConfiguration.class);
private static final Config CONFIG = ConfigFactory.load();
}
package com.typesafe.config;
public final class ConfigFactory {
private ConfigFactory() {
}
public static Config load() {
return load(ConfigParseOptions.defaults());
}
}
- 配置加載使用了 JAVA 配置管理庫 typesafe.config
- 預設加載classpath下的application.conf,application.json和application.properties檔案。通過ConfigFactory.load()加載。