天天看點

Seata 分支事務

引言

前面,我們已經介紹了 Seata 的整體設計思想,接下來我們深入到其實作細節中,本文先來介紹 Seata 中分支事務的整體實作思想,其他 Seata 相關文章均收錄于

<Seata系列文章>

中。

Branch Type

我們已經知道在 Seata 中, 分支事務分 AT 模式和 TCC 模式, 那麼, Seata 是怎麼區分出 AT 模式和 TCC 模式的呢? 這也借助了 Spring 的 AOP 特性, 我們在 TM 中介紹的 GlobalTransactionalInterceptor 實際上隻負責 AT 模式, TCC 模式是另一套攔截器實作, 而這兩種攔截器的注入, 全都是在 GlobalTransactionScanner 中進行的, 也就是說, 我們的 Spring 項目要将 GlobalTransactionScanner 注冊為 Bean, 因為其繼承自 AbstractAutoProxyCreator, Spring 在處理 AOP 過程時, 就會自動将 GlobalTransactionScanner 執行。接下來, 我們看看整個 Seata 的入口 GlobalTransactionScanner 的核心代碼:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware, DisposableBean, BeanPostProcessor {
    // 隻保留核心代碼...

    // 判斷是否需要嵌入 Seata 的 AOP 代碼
    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        if (disableGlobalTransaction) {
            return bean;
        }
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //check TCC proxy, 實際上是根據 TCC 注解 TwoPhaseBusinessAction + RPC 協定類型進行判斷, TCC 目前隻支援一些特定的協定
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    // 識别出 TCC 模式, 使用 TCC 攔截器
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    // 判斷是不是 AT 模式, 通過注解 GlobalTransactional GlobalLock
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                     // 識别出AT 模式, 使用前面提到的 GlobalTransactionalInterceptor
                    if (interceptor == null) {
                        interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    }
                }

                LOGGER.info(
                    "Bean[" + bean.getClass().getName() + "] with name [" + beanName + "] would use interceptor ["
                        + interceptor.getClass().getName() + "]");
                if (!AopUtils.isAopProxy(bean)) {
                    // 如果該類不由 Spring 管控, 則無能為力
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    // 如果是由 Spring 管控, 将 TCC 攔截器 或者 AT 攔截器注入
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, new Object[]{interceptor});
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }
    // 判斷是不是 AT 模式, 通過注解 GlobalTransactional GlobalLock
    private boolean existsAnnotation(Class<?>[] classes) {
        if (classes != null && classes.length > 0) {
            for (Class clazz : classes) {
                if (clazz == null) {
                    continue;
                }
                Method[] methods = clazz.getMethods();
                for (Method method : methods) {
                    GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class);
                    if (trxAnno != null) {
                        return true;
                    }

                    GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                    if (lockAnno != null) {
                        return true;
                    }
                }
            }
        }
        return false;
    }
    // 替換預設的資料庫連接配接源, 改為 AT 模式的資料源代理
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Auto proxy of  [" + beanName + "]");
            }
            DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
            return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
                Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
                if (null != m) {
                    return m.invoke(dataSourceProxy, args);
                } else {
                    return method.invoke(bean, args);
                }
            });
        }
        return bean;
    }
}           

整個項目是從這個 GlobalTransactionScanner 作為起點接入 Seata 的, 一般來說 SpringBoot 會用 @Configuration 類将其注冊為 @Bean, 原生 Spring 項目需要在在 XML 中将其注冊為 Bean,這一點我們從官方的 Sample 中就能發現。

// SpringBoot
@Configuration
public class SeataAutoConfig {
    /**
     * init global transaction scanner
     *
     * @Return: GlobalTransactionScanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner(){
        return new GlobalTransactionScanner("account-gts-seata-example", "my_test_tx_group");
    }
}           

Spring 配置如下所示:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <!--省略其他内容-->
    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-app"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>

</beans>           

好了, 想必大家應該已經清楚 Seata 中是如何發現 TCC 模式和 AT 模式的了, 我們總結一下:

  1. 掃描 Spring 代理的類
  2. 如果發現帶有 TCC 的注解, 并且 RPC 協定滿足條件, 那麼走 TCC 攔截器
  3. 如果發現帶有 AT 的注解, 那麼走 AT 攔截器
  4. 否則, 什麼都不幹

接下來我們, 分别看一下這兩種模式各自都是如何運作起來的。

文章說明

更多有價值的文章均收錄于

貝貝貓的文章目錄
Seata 分支事務

版權聲明: 本部落格所有文章除特别聲明外,均采用 BY-NC-SA 許可協定。轉載請注明出處!

創作聲明: 本文基于下列所有參考内容進行創作,其中可能涉及複制、修改或者轉換,圖檔均來自網絡,如有侵權請聯系我,我會第一時間進行删除。

參考内容

[1]

fescar鎖設計和隔離級别的了解

[2]

分布式事務中間件 Fescar - RM 子產品源碼解讀

[3]

Fescar分布式事務實作原了解析探秘

[4]

Seata TCC 分布式事務源碼分析

[5]

深度剖析一站式分布式事務方案 Seata-Server

[6]

分布式事務 Seata Saga 模式首秀以及三種模式詳解

[7]

螞蟻金服大規模分布式事務實踐和開源詳解

[8]

分布式事務 Seata TCC 模式深度解析

[9]

Fescar (Seata)0.4.0 中文文檔教程

[10]

Seata Github Wiki

[11]

深度剖析一站式分布式事務方案Seata(Fescar)-Server