開篇
這篇文章主要是梳理TC處理TM發送消息的過程,由于消息種類較多是以打算額外寫篇文章分析,這篇文章主要把進入網絡層以後的基本流程梳理下,友善大家閱讀源碼。
這篇文章的沒有針對TM的接收部分進行分析,針對收到封包以後的處理流程。
Transaction Coordinator (TC): 事務協調器,維護全局事務的運作狀态,負責協調并驅動全局事務的送出或復原。
TC處理時序圖
說明:
- TC處理流程核心子產品是DefaultCoordinator和DefaultCore。
- DefaultCoordinator和DefaultCore的調用方式是直接方法進行調用。
TC處理流程源碼分析
public class DefaultCoordinator extends AbstractTCInboundHandler
implements TransactionMessageHandler, ResourceManagerInbound {
private ServerMessageSender messageSender;
private Core core = CoreFactory.get();
public DefaultCoordinator(ServerMessageSender messageSender) {
this.messageSender = messageSender;
core.setResourceManagerInbound(this);
}
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
}
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
response.setGlobalStatus(core.commit(XID.generateXID(request.getTransactionId())));
}
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext)
throws TransactionException {
response.setGlobalStatus(core.rollback(XID.generateXID(request.getTransactionId())));
}
@Override
protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext)
throws TransactionException {
response.setGlobalStatus(core.getStatus(XID.generateXID(request.getTransactionId())));
}
}
- DefaultCoordinator提供doGlobalBegin實作事務的開啟。
- DefaultCoordinator提供doGlobalCommit實作事務的送出。
- DefaultCoordinator提供doGlobalRollback實作事務的復原。
- DefaultCoordinator的Core通過CoreFactory方法擷取。
public class CoreFactory {
private static class SingletonHolder {
private static Core INSTANCE = new DefaultCore();
}
public static final Core get() {
return CoreFactory.SingletonHolder.INSTANCE;
}
public static void set(Core core) {
CoreFactory.SingletonHolder.INSTANCE = core;
}
}
- CoreFactory提供擷取DefaultCore的方法。
public class DefaultCore implements Core {
private LockManager lockManager = LockManagerFactory.get();
private ResourceManagerInbound resourceManagerInbound;
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {}
@Override
public GlobalStatus commit(String xid) throws TransactionException {}
@Override
public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {}
private void asyncCommit(GlobalSession globalSession) throws TransactionException {}
private void queueToRetryCommit(GlobalSession globalSession) throws TransactionException {}
private void queueToRetryRollback(GlobalSession globalSession) throws TransactionException {}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {}
@Override
public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {}
}
- DefaultCore 是TC執行事務操作的核心。
- DefaultCore 提供begin/commit/rollback等操作。