天天看點

Fescar example解析 - TM發送邏輯

開篇

 這篇文章的目的主要是理清楚Fescar的TM發送部分的邏輯,從時序圖和源碼兩個層面進行分析。

 文章中間會解答兩個自己閱讀代碼中遇到的困惑(估計大部分人看代碼的時候也會遇到這個困惑),包括TmRpcClient的初始化過程和配置加載過程。

 文章的最後會附上GlobalAction相關Request的類關系圖,便于了解依賴關系。

Fescar TM發送流程

說明:

  • 1.DefaultGlobalTransaction執行begin/commit/rollback等調用DefaultTransactionManager。
  • 2.DefaultTransactionManager内部調用syncCall()方法,進而調用TmRpcClient的sendMsgWithResponse()方法。
  • 3.TmRpcClient調用父類AbstractRpcRemoting的sendAsyncRequest()方法建構發送隊列。
  • 4.AbstractRpcRemotingClient的MergedSendRunnable線程消費發送隊列建構MergedWarpMessage調用sendRequest發送。
  • 5.sendRequest()方法内部調用writeAndFlush完成消息發送。
  • TmRpcClient的類依賴關系圖如上。
  • TmRpcClient繼承自AbstractRpcRemotingClient類。

Fescar TM發送源碼分析

public class DefaultTransactionManager implements TransactionManager {

    private static class SingletonHolder {
        private static final TransactionManager INSTANCE = new DefaultTransactionManager();
    }

    /**
     * Get transaction manager.
     *
     * @return the transaction manager
     */
    public static TransactionManager get() {
        return SingletonHolder.INSTANCE;
    }

    private DefaultTransactionManager() {

    }

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) 
            throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setTransactionId(txId);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setTransactionId(txId);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setTransactionId(txId);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}           
  • DefaultTransactionManager的beigin/commit/rollback方法内部最終調用syncCall()方法。
  • syncCall方法内部執行TmRpcClient.getInstance().sendMsgWithResponse(request)調用TmRpcClient方法。
public final class TmRpcClient extends AbstractRpcRemotingClient {
    @Override
    public Object sendMsgWithResponse(Object msg) throws TimeoutException {
        return sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout());
    }

    @Override
    public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout)
        throws TimeoutException {
        return sendAsyncRequestWithResponse(serverAddress, connect(serverAddress), msg, timeout);
    }
}           
  • TmRpcClient内部執行發送sendMsgWithResponse調用sendAsyncRequestWithResponse。
  • sendAsyncRequestWithResponse的實作在父類AbstractRpcRemoting當中。
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {

    protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
        TimeoutException {
        if (timeout <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        return sendAsyncRequest(address, channel, msg, timeout);
    }

    private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
        throws TimeoutException {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
            return null;
        }

        // 建構RpcMessage對象
        final RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(RpcMessage.getNextMessageId());
        rpcMessage.setAsync(false);
        rpcMessage.setHeartbeat(false);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(msg);

        // 通過MessageFuture包裝實作逾時
        final MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeout);
        futures.put(rpcMessage.getId(), messageFuture);

        // 測試代碼走的是這個分支
        if (address != null) {
            // 根據address進行hash放置到不同的Map當中
            ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
            BlockingQueue<RpcMessage> basket = map.get(address);
            if (basket == null) {
                map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
                basket = map.get(address);
            }
            basket.offer(rpcMessage);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: " + rpcMessage.getBody());
            }

            // 發送其實是另外一個線程單獨執行發送操作的
            if (!isSending) {
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }
        } else {
            ChannelFuture future;
            channelWriteableCheck(channel, msg);
            future = channel.writeAndFlush(rpcMessage);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(future.cause());
                        }
                        destroyChannel(future.channel());
                    }
                }
            });
        }

        // 通過Future實作限時逾時機制
        if (timeout > 0) {
            try {
                return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException)exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }
        } else {
            return null;
        }
    }
}           
  • 建構RpcMessage對象,包裝Request。
  • 建構MessageFuture對象,包裝RpcMessage,實作逾時等待功能。
  • 通過basket進行分桶操作,真正執行發送的代碼在AbstractRpcRemotingClient類的MergedSendRunnable。
  • Request的發送類似生成消費者模型,上述代碼隻是生産者部分。
public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RemotingService, RegisterMsgListener, ClientMessageSender {

    public class MergedSendRunnable implements Runnable {

        @Override
        public void run() {
            while (true) {
                synchronized (mergeLock) {
                    try {
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {}
                }
                isSending = true;
                for (String address : basketMap.keySet()) {
                    BlockingQueue<RpcMessage> basket = basketMap.get(address);
                    if (basket.isEmpty()) { continue; }

                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        mergeMessage.msgs.add((AbstractMessage)msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = connect(address);
                    try {
                        sendRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable
                            && address != null) {
                            destroyChannel(address, sendChannel);
                        }
                        LOGGER.error("", "client merge call failed", e);
                    }
                }
                isSending = false;
            }
        }
}           
  • MergedSendRunnable 負責消費待發送消息體并組裝成MergedWarpMessage對象。
  • sendRequest()方法内部将MergedWarpMessage再次包裝成RpcMessage進行發送。
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {

    protected void sendRequest(Channel channel, Object msg) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setAsync(true);
        rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(msg);
        rpcMessage.setId(RpcMessage.getNextMessageId());
        if (msg instanceof MergeMessage) {
            mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
        }
        channelWriteableCheck(channel, msg);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
                + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        channel.writeAndFlush(rpcMessage);
    }
}           
  • RpcMessage再次包裝MergeMessage進行發送。

TmRpcClient初始化

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean {

    public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode,
                                    FailureHandler failureHandlerHook) {
        setOrder(ORDER_NUM);
        setProxyTargetClass(true);
        this.applicationId = applicationId;
        this.txServiceGroup = txServiceGroup;
        this.mode = mode;
        this.failureHandlerHook = failureHandlerHook;
    }

    private void initClient() {

        TMClient.init(applicationId, txServiceGroup);
        if ((AT_MODE & mode) > 0) {
            RMClientAT.init(applicationId, txServiceGroup);
        }
    }

    public void afterPropertiesSet() {
        initClient();
    }
}           
  • GlobalTransactionScanner的構造函數執行後執行afterPropertiesSet并執行initClient()操作。
  • initClient()内部執行TMClient.init(applicationId, txServiceGroup)進行TMClient的初始化。
public class TMClient {
    public static void init(String applicationId, String transactionServiceGroup) {
        TmRpcClient tmRpcClient = TmRpcClient.getInstance(
                 applicationId, transactionServiceGroup);
        tmRpcClient.init();
    }
}

public final class TmRpcClient extends AbstractRpcRemotingClient {
    public void init() {
        if (initialized.compareAndSet(false, true)) {
            init(SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS);
        }
    }


    public void init(long healthCheckDelay, long healthCheckPeriod) {
        // 注意initVars()方法
        initVars();

        ExecutorService mergeSendExecutorService = new ThreadPoolExecutor(
           MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD,
           KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, 
           new LinkedBlockingQueue<Runnable>(),
           new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX), 
                        MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    reconnect();
                } catch (Exception ignore) {
                    LOGGER.error(ignore.getMessage());
                }
            }
        }, healthCheckDelay, healthCheckPeriod, TimeUnit.SECONDS);
    }


    private void initVars() {
        enableDegrade = CONFIG.getBoolean(
        ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX);
        super.init();
    }
}           
  • 核心在于關注initVars()方法。
public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RemotingService, RegisterMsgListener, ClientMessageSender {

    public void init() {
        NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
        // 核心建構發送的對象的連接配接池
        nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory);
        nettyClientKeyPool.setConfig(getNettyPoolConfig());
        serviceManager = new ServiceManagerStaticConfigImpl();
        super.init();
    }
}


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
    public void init() {
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());

                for (MessageFuture future : futures.values()) {
                    if (future.isTimeout()) {
                        timeoutMessageFutures.add(future);
                    }
                }

                for (MessageFuture messageFuture : timeoutMessageFutures) {
                    futures.remove(messageFuture.getRequestMessage().getId());
                    messageFuture.setResultMessage(null);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
                    }
                }
                nowMills = System.currentTimeMillis();
            }
        }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
    }
}           
  • AbstractRpcRemotingClient的init()方法核心建構nettyClientKeyPool工廠。
  • nettyClientKeyPool用于擷取連接配接TC的對象的工廠池。

配置加載分析

public class FileConfiguration implements Configuration {

    private static final Logger LOGGER = LoggerFactory.getLogger(FileConfiguration.class);

    private static final Config CONFIG = ConfigFactory.load();
}


package com.typesafe.config;
public final class ConfigFactory {
    private ConfigFactory() {
    }

    public static Config load() {
        return load(ConfigParseOptions.defaults());
    }
}           
  • 配置加載使用了 JAVA 配置管理庫 typesafe.config
  • 預設加載classpath下的application.conf,application.json和application.properties檔案。通過ConfigFactory.load()加載。

Request的類關系圖