天天看点

Fescar TC-beigin流程

开篇

 这篇文章是用来讲解清楚TC(Transaction Coordinator:事务协调器)在处理TM发送过来的begin操作(事务开启操作)。

 核心逻辑包括GlobalSession对象的生成、GlobalSession的持久化以及XID生成。

TC begin 流程说明

  • 1.创建GlobalSession对象,GlobalSession.createGlobalSession()。
  • 2.添加周期监听器到GlobalSession当中,生命周期对象为DefaultSessionManager。
  • 3.启动GlobalSession的周期监听器,添加GlobalSession对象到全局sessionMap对象。
  • 4.启动GlobalSession的周期监听器,持久化GlobalSession对象。

TC begin 源码分析

public class DefaultCore implements Core {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, 
                        String name, int timeout) 
    throws TransactionException {

        // 创建全局GlobalSession对象
        GlobalSession session = GlobalSession.createGlobalSession(
                applicationId, transactionServiceGroup, name, timeout);

        // 全局GlobalSession对象添加生命周期监听器SessionHolder.getRootSessionManager()
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

        // 启动全局Session对象GlobalSession
        session.begin();

        // 返回新生成的XID返回
        return XID.generateXID(session.getTransactionId());
    }
}           

说明:

  • GlobalSession.createGlobalSession()创建全局GlobalSession对象。
  • session.addSessionLifecycleListener()给GlobalSession对象添加生命周期监听器。
  • session.begin()方法通过生命周期监听器保存全局GlobalSession对象。
  • sessionHolder.getRootSessionManager()返回DefaultSessionManager对象。
  • XID.generateXID()创建XID值。
public class GlobalSession implements SessionLifecycle, SessionStorable {
    // 生命周期监听器的容器
    private ArrayList<SessionLifecycleListener> lifecycleListeners 
                                              =  new ArrayList<>();

    public static GlobalSession createGlobalSession(String applicationId, 
              String txServiceGroup, String txName, int timeout) {
        GlobalSession session = 
         new GlobalSession(applicationId, txServiceGroup, txName, timeout);
        return session;
    }

    public GlobalSession(String applicationId, String transactionServiceGroup, 
                         String transactionName, int timeout) {
        // 生成transactionId对象。
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;

        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
    }

    // 添加生命周期监听器
    public void addSessionLifecycleListener(
         SessionLifecycleListener sessionLifecycleListener) {
         lifecycleListeners.add(sessionLifecycleListener);
    }

    public void begin() throws TransactionException {
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onBegin(this);
        }
    }
}


// 生成TransactionId的类和方法
public class UUIDGenerator {

    private static AtomicLong UUID = new AtomicLong(1000);
    private static int UUID_INTERNAL = 200000000;

    public static long generateUUID() {
        long id = UUID.incrementAndGet();
        if (id > 2000000000) {
            synchronized (UUID) {
                if (UUID.get() >= id) {
                    id -= 2000000000;
                    UUID.set(id);
                }
            }
        }
        return id;
    }
}           
  • GlobalSession构造器内部通过UUIDGenerator.generateUUID()生成transactionId。
  • addSessionLifecycleListener()方法添加生命周期监听器DefaultSessionManager。
  • begin()方法调用生命周期监听器的onBegin()方法(lifecycleListener.onBegin),实现GlobalSession的持久化。
public class SessionHolder {

    private static final String ROOT_SESSION_MANAGER_NAME = "root.data";
    private static final String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
    private static final String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
    private static final String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";
    private static SessionManager ROOT_SESSION_MANAGER;
    private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

    public static void init(String sessionStorePath) throws IOException {
        if (sessionStorePath == null) {
            ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        } else {
            if (!sessionStorePath.endsWith("/")) {
                sessionStorePath = sessionStorePath + "/";
            }
            ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        }
    }

    public static final SessionManager getRootSessionManager() {
        if (ROOT_SESSION_MANAGER == null) {
            throw new ShouldNeverHappenException("SessionManager is NOT init!");
        }
        return ROOT_SESSION_MANAGER;
    }
}           
  • getRootSessionManager()返回DefaultSessionManager对象,实现生命周期接口。
public class DefaultSessionManager extends AbstractSessionManager {

    public DefaultSessionManager(String name) {
        super(name);
        transactionStoreManager = new TransactionStoreManager() {
            @Override
            public boolean writeSession(LogOperation logOperation, 
                                        SessionStorable session) {
                return false;
            }

            @Override
            public void shutdown() {

            }

            @Override
            public List<TransactionWriteStore> readWriteStoreFromFile(int readSize, 
                boolean isHistory) {
                return null;
            }

            @Override
            public boolean hasRemaining(boolean isHistory) {
                return false;
            }
        };
    }
}

public abstract class AbstractSessionManager 
                      implements SessionManager, SessionLifecycleListener {

    protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSessionManager.class);

    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();

    protected TransactionStoreManager transactionStoreManager;

    protected String name;

    public AbstractSessionManager(String name) {
        this.name = name;
    }

    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        sessionMap.put(session.getTransactionId(), session);

    }

    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
        addGlobalSession(globalSession);
    }
}           
  • DefaultSessionManager是GlobalSession的生命周期管理器。
  • DefaultSessionManager的父类AbstractSessionManager实现SessionLifecycleListener接口。
  • DefaultSessionManager的调用父类AbstractSessionManager的onBegin()方法。
  • onBegin()方法内部执行addGlobalSession()方法添加GlobalSession对象。
  • addGlobalSession()方法执行transactionStoreManager.writeSession()执行持久化,自定义的TransactionStoreManager啥都不操作。
  • transactionStoreManager是DefaultSessionManager内生成TransactionStoreManager对象。
  • addGlobalSession()方法执行sessionMap.put()保存GlobalSession对象。

继续阅读