天天看點

Fescar example解析 - TC流程

開篇

 這篇文章主要是梳理TC處理TM發送消息的過程,由于消息種類較多是以打算額外寫篇文章分析,這篇文章主要把進入網絡層以後的基本流程梳理下,友善大家閱讀源碼。

 這篇文章的沒有針對TM的接收部分進行分析,針對收到封包以後的處理流程。

 Transaction Coordinator (TC): 事務協調器,維護全局事務的運作狀态,負責協調并驅動全局事務的送出或復原。

TC處理時序圖

說明:

  • TC處理流程核心子產品是DefaultCoordinator和DefaultCore。
  • DefaultCoordinator和DefaultCore的調用方式是直接方法進行調用。

TC處理流程源碼分析

public class DefaultCoordinator extends AbstractTCInboundHandler
    implements TransactionMessageHandler, ResourceManagerInbound {

    private ServerMessageSender messageSender;
    private Core core = CoreFactory.get();

    public DefaultCoordinator(ServerMessageSender messageSender) {
        this.messageSender = messageSender;
        core.setResourceManagerInbound(this);
    }

    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) 
        throws TransactionException {
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), 
                                   request.getTransactionName(), request.getTimeout()));
    }

    @Override
    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) 
        throws TransactionException {
        response.setGlobalStatus(core.commit(XID.generateXID(request.getTransactionId())));

    }

    @Override
    protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) 
        throws TransactionException {
        response.setGlobalStatus(core.rollback(XID.generateXID(request.getTransactionId())));

    }

    @Override
    protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext) 
        throws TransactionException {
        response.setGlobalStatus(core.getStatus(XID.generateXID(request.getTransactionId())));
    }
}           
  • DefaultCoordinator提供doGlobalBegin實作事務的開啟。
  • DefaultCoordinator提供doGlobalCommit實作事務的送出。
  • DefaultCoordinator提供doGlobalRollback實作事務的復原。
  • DefaultCoordinator的Core通過CoreFactory方法擷取。
public class CoreFactory {

    private static class SingletonHolder {
        private static Core INSTANCE = new DefaultCore();
    }

    public static final Core get() {
        return CoreFactory.SingletonHolder.INSTANCE;
    }

    public static void set(Core core) {
        CoreFactory.SingletonHolder.INSTANCE = core;
    }
}           
  • CoreFactory提供擷取DefaultCore的方法。
public class DefaultCore implements Core {

    private LockManager lockManager = LockManagerFactory.get();
    private ResourceManagerInbound resourceManagerInbound;

    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) 
    throws TransactionException {}

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {}

    @Override
    public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {}

    private void asyncCommit(GlobalSession globalSession) throws TransactionException {}

    private void queueToRetryCommit(GlobalSession globalSession) throws TransactionException {}

    private void queueToRetryRollback(GlobalSession globalSession) throws TransactionException {}

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {}

    @Override
    public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {}
}           
  • DefaultCore 是TC執行事務操作的核心。
  • DefaultCore 提供begin/commit/rollback等操作。

繼續閱讀