天天看點

Seata TCC 分支事務

引言

前面,我們已經介紹了 Seata 的整體設計思想,接下來我們深入到其實作細節中,本文介紹 Seata 中 TCC 模式分支事務的實作,其他 Seata 相關文章均收錄于

<Seata系列文章>

中。

TCC 模式

先簡單介紹一個 Seata 中 TCC 的使用方式, 然後我們在順着它的使用方式, 一點點深入其實作方案。

在 Seata TCC 模式中, 每個 RM 都需要将 TCC 接口以 RPC 的形式暴露出去, 同時向 TC 中注冊, 告訴 TC 自己是某一 TCC 接口的提供方, 這樣如果發生送出或者復原時, TC 就知道該去找誰了。然後 TM 在進行 TCC 調用之前先去 TC 注冊分支事務,告訴 TC 這個分支事務用的是哪個 TCC 接口,然後才通過 RPC 調用 TCC 接口的 try 方法,當發生全局事務送出或者復原時, TC 會直接通知該 TCC 接口的提供方進行分支處理。

public interface TccActionTwo {

    /**
     * Prepare boolean.
     *
     * @param actionContext the action context
     * @param b             the b
     * @param list          the list
     * @return the boolean
     */
    @TwoPhaseBusinessAction(name = "DubboTccActionTwo" , commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "b") String b,
                           @BusinessActionContextParameter(paramName = "c",index = 1) List list);

    /**
     * Commit boolean.
     *
     * @param actionContext the action context
     * @return the boolean
     */
    public boolean commit(BusinessActionContext actionContext);

    /**
     * Rollback boolean.
     *
     * @param actionContext the action context
     * @return the boolean
     */
    public boolean rollback(BusinessActionContext actionContext);

}           

上面的代碼就是一個 TCC 接口的樣例, 通過 TwoPhaseBusinessAction 注解,接口可以告訴 Seata 哪個是 prepare 函數, 該注解中辨別了該 TCC 的資源名, 復原和送出函數名, 在 prepare 函數的參數中, 我們可以看到 BusinessActionContextParameter 注解, 它辨別了哪些參數在復原或送出時也需要用到, 因為 TCC 模式中 RM 是不在本地存儲參數資訊的, 這些資料都存在 TC 中, 是以 TM 在進行 RPC 調用前, 會根據該注解将復原或送出時需要用到的參數存儲在 TC 中。如果發生復原或者送出, TC 要把調用 prepare 時使用到的的這些參數存在 Context 裡, 發送給 RM, 這樣 RM 就可以用到這些參數。

知道了 TCC 的工作流程後, 就要深入 TCC 模式的實作了, 我們就得回到最初 AT 模式 和 TCC 模式分道揚镳的地方 GlobalTransactionScanner, 在這裡進行了 TCC 的判斷:

if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
}           

看來, 這個 TCCBeanParserUtils.isTccAutoProxy 是判斷 TCC 模式的關鍵, 我們不妨看一看它的内容:

public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {
    RemotingDesc remotingDesc = null;
    // 在這個比對過程中, 就完成了 RPC 提供方的注冊 parserRemotingServiceInfo
    boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);
    //is remoting bean
    if (isRemotingBean) {
        remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
        if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {
            //LocalTCC
            return isTccProxyTargetBean(remotingDesc);
        } else {
            // sofa:reference / dubbo:reference, factory bean
            return false;
        }
    } else {
        //get RemotingBean description
        remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
        if (remotingDesc == null) {
            //check FactoryBean
            if (isRemotingFactoryBean(bean, beanName, applicationContext)) {
                remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
                return isTccProxyTargetBean(remotingDesc);
            } else {
                return false;
            }
        } else {
            return isTccProxyTargetBean(remotingDesc);
        }
    }
}
// 上述的函數隻是判斷是不是 RPC 接口, 然後下面的才是檢查注解的過程
protected static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {
    if (remotingDesc == null) {
        return false;
    }
    //check if it is TCC bean
    boolean isTccClazz = false;
    Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();
    Method[] methods = tccInterfaceClazz.getMethods();
    TwoPhaseBusinessAction twoPhaseBusinessAction = null;
    for (Method method : methods) {
        twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
        if (twoPhaseBusinessAction != null) {
            isTccClazz = true;
            break;
        }
    }
    if (!isTccClazz) {
        return false;
    }
    short protocols = remotingDesc.getProtocol();
    //LocalTCC
    if (Protocols.IN_JVM == protocols) {
        //in jvm TCC bean , AOP
        return true;
    }
    // sofa:reference /  dubbo:reference, AOP
    return remotingDesc.isReference();
}           

我們可以看到, isTccAutoProxy 隻判斷它是不是 RPC 調用, 而 isTccProxyTargetBean 進行注解的檢查, 在 isTccProxyTargetBean 的最後幾行, 決定了是否要加入 TCC 的攔截器: 隻有當接口協定為 JVM 或者是 RPC 的消費者(isReference)。這時候,我們可以大膽的猜測,TccActionInterceptor 這個攔截器肯定就是 TM 注冊分支事務的攔截器, 因為隻有 TM 才負責 TCC 模式的分支注冊:

// TccActionInterceptor#invoke -> ActionInterceptorHandler#proceed
public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
                                       Callback<Object> targetCallback) throws Throwable {
    Map<String, Object> ret = new HashMap<String, Object>(16);

    //TCC name
    String actionName = businessAction.name();
    BusinessActionContext actionContext = new BusinessActionContext();
    actionContext.setXid(xid);
    //set action anme
    actionContext.setActionName(actionName);
    //TODO services

    //Creating Branch Record
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);

    //set the parameter whose type is BusinessActionContext
    Class<?>[] types = method.getParameterTypes();
    int argIndex = 0;
    for (Class<?> cls : types) {
        if (cls.getName().equals(BusinessActionContext.class.getName())) {
            arguments[argIndex] = actionContext;
            break;
        }
        argIndex++;
    }
    //the final parameters of the try method
    ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
    //the final result
    ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
    return ret;
}
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,
                                         BusinessActionContext actionContext) {
    String actionName = actionContext.getActionName();
    String xid = actionContext.getXid();
    // 找到所有需要儲存的參數, 存在 context 中一起發給 TC
    Map<String, Object> context = fetchActionRequestContext(method, arguments);
    context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());

    //init business context
    initBusinessContext(context, method, businessAction);
    //Init running environment context
    initFrameworkContext(context);
    actionContext.setActionContext(context);

    //init applicationData
    Map<String, Object> applicationContext = new HashMap<String, Object>(4);
    applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);
    String applicationContextStr = JSON.toJSONString(applicationContext);
    try {
        //registry branch record
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,
            applicationContextStr, null);
        return String.valueOf(branchId);
    } catch (Throwable t) {
        String msg = "TCC branch Register error, xid:" + xid;
        LOGGER.error(msg, t);
        throw new FrameworkException(t, msg);
    }
}           

果不其然, 這個 TccActionInterceptor 下層調用的 ActionInterceptorHandler 進行了分支注冊, 而且我們可以看到, 在進行分支注冊時, 它找到所有需要儲存的參數, 存在 context 中一起發給 TC。

這裡還有兩個問題: Seata 怎麼分辨出 RPC 類型的 ? Seata 在哪向 TC 進行了 TCC 接口的注冊?

我們先看第一個問題, Seata 如何識别出 RPC 的呢? 換句話說, 目前 TCC 模式支援的 Dubbo, Sofa, JVM, 這三種模式又是如何識别出來的呢? 看過我

Dubbo系列文章

的同學, 可能就知道 RPC 的實作過程實際上就是代理本地接口, 加入 RPC 架構, 那麼我們在進行實際的調用時, 判斷一下調用的類是不是 Dubbo 的代理類名, 就知道啦。同樣的, Sofa 也是這樣實作的, 而 JVM 呢, 則是需要開發人員顯式地加注解

LocalTCC

, 然後 Seata 掃描該注解。

// DubboRemotingParser: dubbo 和 sofa 都類似如下, 判斷代理類的名字
@Override
public boolean isReference(Object bean, String beanName) throws FrameworkException {
    Class<?> c = bean.getClass();
    return "com.alibaba.dubbo.config.spring.ReferenceBean".equals(c.getName())
        || "org.apache.dubbo.config.spring.ReferenceBean".equals(c.getName());
}

@Override
public boolean isService(Object bean, String beanName) throws FrameworkException {
    Class<?> c = bean.getClass();
    return "com.alibaba.dubbo.config.spring.ServiceBean".equals(c.getName())
        || "org.apache.dubbo.config.spring.ServiceBean".equals(c.getName());
}
// LocalTCCRemotingParser: JVM 直接判斷注解
@Override
public boolean isReference(Object bean, String beanName) {
    Class<?> classType = bean.getClass();
    Set<Class<?>> interfaceClasses = ReflectionUtil.getInterfaces(classType);
    for (Class<?> interClass : interfaceClasses) {
        if (interClass.isAnnotationPresent(LocalTCC.class)) {
            return true;
        }
    }
    return false;
}
@Override
public boolean isService(Object bean, String beanName) {
    Class<?> classType = bean.getClass();
    Set<Class<?>> interfaceClasses = ReflectionUtil.getInterfaces(classType);
    for (Class<?> interClass : interfaceClasses) {
        if (interClass.isAnnotationPresent(LocalTCC.class)) {
            return true;
        }
    }
    return false;
}           

通過這個判斷過程, 我們拿到了許多資訊, 我們就知道了項目中包含的所有 TCC 接口, 也知道了本節點是該 RPC 的提供方(isService)還是消費方(isReference), 在進行比對的過程中, 如果自己是該 TCC 接口的提供方的話, 就會立即去 TC 注冊, 這樣 TC 就知道該 TCC 接口送出和復原找誰了。注冊的過程如下, 重點是第一行的判斷邏輯, isService 時才會注冊:

// DefaultRemotingParser
// 是 RPC 的提供方才會去 TC 注冊
if (isService(bean, beanName)) {
    try {
        //service bean, registry resource
        Object targetBean = remotingBeanDesc.getTargetBean();
        for (Method m : methods) {
            TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
            if (twoPhaseBusinessAction != null) {
                //
                TCCResource tccResource = new TCCResource();
                tccResource.setActionName(twoPhaseBusinessAction.name());
                tccResource.setTargetBean(targetBean);
                tccResource.setPrepareMethod(m);
                tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                tccResource.setCommitMethod(ReflectionUtil
                    .getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
                        new Class[] {BusinessActionContext.class}));
                tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                tccResource.setRollbackMethod(ReflectionUtil
                    .getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
                        new Class[] {BusinessActionContext.class}));
                //registry tcc resource
                DefaultResourceManager.get().registerResource(tccResource);
            }
        }
    } catch (Throwable t) {
        throw new FrameworkException(t, "parser remting service error");
    }
}           

到此為止, Seata 怎麼發現 TCC 接口, 什麼角色去 TC 注冊 TCC 接口, 什麼角色進行 TCC 分支的注冊, 想必大家全都明白了, 當全局事務發生送出或復原時, TC 可以根據前面的注冊内容, 找到所有提供該 TCC 接口服務的節點, 然後向它們發送送出或者復原請求。

// TCCResourceManager
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    // RM 從本地找到該 resourceId 對應的 TCC 接口資料
    TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
    if (tccResource == null) {
        throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
    }
    // 找到送出函數
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    if (targetTCCBean == null || commitMethod == null) {
        throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
    }
    try {
        boolean result = false;
        //BusinessActionContext
        // 根據 TC 發來的資料, 建構事務 Context, 其中用到的 applicationData 就是調用 prepare 時使用的參數
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
            applicationData);
        // 執行送出函數
        Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
        LOGGER.info(
            "TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:"
                + resourceId);
        if (ret != null) {
            if (ret instanceof TwoPhaseResult) {
                result = ((TwoPhaseResult)ret).isSuccess();
            } else {
                result = (boolean)ret;
            }
        }
        // 上報結果
        return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
    } catch (Throwable t) {
        String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
        LOGGER.error(msg, t);
        throw new FrameworkException(t, msg);
    }
}           

我們就簡單給他家看看分支送出時 RM 的處理過程, TCC 的 RM 收到送出指令後, 從記憶體中找到該資源對應的 commit 函數, 把事務編号, 資源編号, 還有 TCC try 過程中用到的 applicationData 都塞到這個 BusinessActionContext 中, 最後調用 TCC commit 函數,然後它還會将執行結果上報給 TC。

目前 TCC 模式的功能到這就都走完了, 在前面的理論環節我們說過, TCC 模式要做到幂等, 防倒挂, 這些雖然還沒有實裝在 Seata 中, 但是今後肯定會有的, 它的實作應該會很類似于 AT 模式, 在 DB 中存儲分支事務狀态, 随 TCC 接口的本地事務一同送出。

文章說明

更多有價值的文章均收錄于

貝貝貓的文章目錄
Seata TCC 分支事務

版權聲明: 本部落格所有文章除特别聲明外,均采用 BY-NC-SA 許可協定。轉載請注明出處!

創作聲明: 本文基于下列所有參考内容進行創作,其中可能涉及複制、修改或者轉換,圖檔均來自網絡,如有侵權請聯系我,我會第一時間進行删除。

參考内容

[1]

fescar鎖設計和隔離級别的了解

[2]

分布式事務中間件 Fescar - RM 子產品源碼解讀

[3]

Fescar分布式事務實作原了解析探秘

[4]

Seata TCC 分布式事務源碼分析

[5]

深度剖析一站式分布式事務方案 Seata-Server

[6]

分布式事務 Seata Saga 模式首秀以及三種模式詳解

[7]

螞蟻金服大規模分布式事務實踐和開源詳解

[8]

分布式事務 Seata TCC 模式深度解析

[9]

Fescar (Seata)0.4.0 中文文檔教程

[10]

Seata Github Wiki

[11]

深度剖析一站式分布式事務方案Seata(Fescar)-Server