开篇
这篇文章是用来讲解清楚TC(Transaction Coordinator:事务协调器)在处理TM发送过来的begin操作(事务开启操作)。
核心逻辑包括GlobalSession对象的生成、GlobalSession的持久化以及XID生成。
TC begin 流程说明
- 1.创建GlobalSession对象,GlobalSession.createGlobalSession()。
- 2.添加周期监听器到GlobalSession当中,生命周期对象为DefaultSessionManager。
- 3.启动GlobalSession的周期监听器,添加GlobalSession对象到全局sessionMap对象。
- 4.启动GlobalSession的周期监听器,持久化GlobalSession对象。
TC begin 源码分析
public class DefaultCore implements Core {
@Override
public String begin(String applicationId, String transactionServiceGroup,
String name, int timeout)
throws TransactionException {
// 创建全局GlobalSession对象
GlobalSession session = GlobalSession.createGlobalSession(
applicationId, transactionServiceGroup, name, timeout);
// 全局GlobalSession对象添加生命周期监听器SessionHolder.getRootSessionManager()
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 启动全局Session对象GlobalSession
session.begin();
// 返回新生成的XID返回
return XID.generateXID(session.getTransactionId());
}
}
说明:
- GlobalSession.createGlobalSession()创建全局GlobalSession对象。
- session.addSessionLifecycleListener()给GlobalSession对象添加生命周期监听器。
- session.begin()方法通过生命周期监听器保存全局GlobalSession对象。
- sessionHolder.getRootSessionManager()返回DefaultSessionManager对象。
- XID.generateXID()创建XID值。
public class GlobalSession implements SessionLifecycle, SessionStorable {
// 生命周期监听器的容器
private ArrayList<SessionLifecycleListener> lifecycleListeners
= new ArrayList<>();
public static GlobalSession createGlobalSession(String applicationId,
String txServiceGroup, String txName, int timeout) {
GlobalSession session =
new GlobalSession(applicationId, txServiceGroup, txName, timeout);
return session;
}
public GlobalSession(String applicationId, String transactionServiceGroup,
String transactionName, int timeout) {
// 生成transactionId对象。
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
}
// 添加生命周期监听器
public void addSessionLifecycleListener(
SessionLifecycleListener sessionLifecycleListener) {
lifecycleListeners.add(sessionLifecycleListener);
}
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}
}
// 生成TransactionId的类和方法
public class UUIDGenerator {
private static AtomicLong UUID = new AtomicLong(1000);
private static int UUID_INTERNAL = 200000000;
public static long generateUUID() {
long id = UUID.incrementAndGet();
if (id > 2000000000) {
synchronized (UUID) {
if (UUID.get() >= id) {
id -= 2000000000;
UUID.set(id);
}
}
}
return id;
}
}
- GlobalSession构造器内部通过UUIDGenerator.generateUUID()生成transactionId。
- addSessionLifecycleListener()方法添加生命周期监听器DefaultSessionManager。
- begin()方法调用生命周期监听器的onBegin()方法(lifecycleListener.onBegin),实现GlobalSession的持久化。
public class SessionHolder {
private static final String ROOT_SESSION_MANAGER_NAME = "root.data";
private static final String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
private static final String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
private static final String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";
private static SessionManager ROOT_SESSION_MANAGER;
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
public static void init(String sessionStorePath) throws IOException {
if (sessionStorePath == null) {
ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME);
ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
} else {
if (!sessionStorePath.endsWith("/")) {
sessionStorePath = sessionStorePath + "/";
}
ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath);
ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
}
}
public static final SessionManager getRootSessionManager() {
if (ROOT_SESSION_MANAGER == null) {
throw new ShouldNeverHappenException("SessionManager is NOT init!");
}
return ROOT_SESSION_MANAGER;
}
}
- getRootSessionManager()返回DefaultSessionManager对象,实现生命周期接口。
public class DefaultSessionManager extends AbstractSessionManager {
public DefaultSessionManager(String name) {
super(name);
transactionStoreManager = new TransactionStoreManager() {
@Override
public boolean writeSession(LogOperation logOperation,
SessionStorable session) {
return false;
}
@Override
public void shutdown() {
}
@Override
public List<TransactionWriteStore> readWriteStoreFromFile(int readSize,
boolean isHistory) {
return null;
}
@Override
public boolean hasRemaining(boolean isHistory) {
return false;
}
};
}
}
public abstract class AbstractSessionManager
implements SessionManager, SessionLifecycleListener {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSessionManager.class);
protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();
protected TransactionStoreManager transactionStoreManager;
protected String name;
public AbstractSessionManager(String name) {
this.name = name;
}
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
}
transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
sessionMap.put(session.getTransactionId(), session);
}
@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {
addGlobalSession(globalSession);
}
}
- DefaultSessionManager是GlobalSession的生命周期管理器。
- DefaultSessionManager的父类AbstractSessionManager实现SessionLifecycleListener接口。
- DefaultSessionManager的调用父类AbstractSessionManager的onBegin()方法。
- onBegin()方法内部执行addGlobalSession()方法添加GlobalSession对象。
- addGlobalSession()方法执行transactionStoreManager.writeSession()执行持久化,自定义的TransactionStoreManager啥都不操作。
- transactionStoreManager是DefaultSessionManager内生成TransactionStoreManager对象。
- addGlobalSession()方法执行sessionMap.put()保存GlobalSession对象。