天天看點

mybatis 多資料源_SpringBoot+Mybatis配置多資料源及事務方案前言一、多資料源1、資料庫連接配接2、配置DataSource3、設定路由鍵4、測試二、事務模式,為啥不能切換資料源1、建立事務2、開啟事務3、執行Mapper接口三、事務模式,怎麼支援切換資料源四、XA協定分布式事務總結

mybatis 多資料源_SpringBoot+Mybatis配置多資料源及事務方案前言一、多資料源1、資料庫連接配接2、配置DataSource3、設定路由鍵4、測試二、事務模式,為啥不能切換資料源1、建立事務2、開啟事務3、執行Mapper接口三、事務模式,怎麼支援切換資料源四、XA協定分布式事務總結

前言

可能由于業務上的某些需求,我們的系統中有時往往要連接配接多個資料庫,這就産生了多資料源問題。

多資料源的情況下,一般我們要做到可以自動切換,此時會涉及到事務注解 Transactional 不生效問題和分布式事務問題。

關于多資料源方案,筆者在網上看過一些例子,然而大部分都是錯誤示例,根本跑不通,或者沒辦法相容事務。

今天,我們就一點點來分析這些問題産生的根源和相應的解決方法。

一、多資料源

為了劇情的順利開展,我們模拟的業務是建立訂單和扣減庫存。

是以,我們先建立訂單表和庫存表。注意,把他們分别放到兩個資料庫中。

CREATE TABLE `t_storage` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `commodity_code` varchar(255) DEFAULT NULL,  `count` int(11) DEFAULT '0',  PRIMARY KEY (`id`),  UNIQUE KEY `commodity_code` (`commodity_code`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;CREATE TABLE `t_order` (  `id` bigint(16) NOT NULL,  `commodity_code` varchar(255) DEFAULT NULL,  `count` int(11) DEFAULT '0',  `amount` double(14,2) DEFAULT '0.00',  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
           

1、資料庫連接配接

通過YML檔案先把兩個資料庫都配置一下。

spring:  datasource:    ds1:      jdbc_url: jdbc:mysql://127.0.0.1:3306/db1      username: root      password: root    ds2:      jdbc_url: jdbc:mysql://127.0.0.1:3306/db2      username: root      password: root
           

2、配置DataSource

我們知道,Mybatis執行一條SQL語句的時候,需要先擷取一個Connection。這時候,就交由Spring管理器到DataSource中擷取連接配接。

Spring中有個具有路由功能的DataSource,它可以通過查找鍵調用不同的資料源,這就是AbstractRoutingDataSource。

public abstract class AbstractRoutingDataSource{    //資料源的集合    @Nullable    private Map targetDataSources;    //預設的資料源    @Nullable    private Object defaultTargetDataSource;    //傳回目前的路由鍵,根據該值傳回不同的資料源    @Nullable    protected abstract Object determineCurrentLookupKey();        //确定一個資料源    protected DataSource determineTargetDataSource() {        //抽象方法 傳回一個路由鍵        Object lookupKey = determineCurrentLookupKey();        DataSource dataSource = this.targetDataSources.get(lookupKey);        return dataSource;    }}
           

可以看到,該抽象類的核心就是先設定多個資料源到Map集合中,然後根據Key可以擷取不同的資料源。

那麼,我們就可以重寫這個determineCurrentLookupKey方法,它傳回的是一個資料源的名稱。

public class DynamicDataSource extends AbstractRoutingDataSource {    @Override    protected Object determineCurrentLookupKey() {        DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType();        return dataBaseType;    }}
           

然後還需要一個工具類,來儲存目前線程的資料源類型。

public class DataSourceType {    public enum DataBaseType {        ds1, ds2    }    // 使用ThreadLocal保證線程安全    private static final ThreadLocal TYPE = new ThreadLocal();    // 往目前線程裡設定資料源類型    public static void setDataBaseType(DataBaseType dataBaseType) {        if (dataBaseType == null) {            throw new NullPointerException();        }        TYPE.set(dataBaseType);    }    // 擷取資料源類型    public static DataBaseType getDataBaseType() {        DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get();        return dataBaseType;    }}
           

這些都搞定之後,我們還需要把這個DataSource配置到Spring容器中去。下面這個配置類的作用如下:

  • 建立多個資料源DataSource,ds1 和 ds2;
  • 将ds1 和 ds2 資料源放入動态資料源DynamicDataSource;
  • 将DynamicDataSource注入到SqlSessionFactory。
@Configurationpublic class DataSourceConfig {    /**     * 建立多個資料源 ds1 和 ds2     * 此處的Primary,是設定一個Bean的優先級     * @return     */    @Primary    @Bean(name = "ds1")    @ConfigurationProperties(prefix = "spring.datasource.ds1")    public DataSource getDateSource1() {        return DataSourceBuilder.create().build();    }    @Bean(name = "ds2")    @ConfigurationProperties(prefix = "spring.datasource.ds2")    public DataSource getDateSource2() {        return DataSourceBuilder.create().build();    }    /**     * 将多個資料源注入到DynamicDataSource     * @param dataSource1     * @param dataSource2     * @return     */    @Bean(name = "dynamicDataSource")    public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1,                                        @Qualifier("ds2") DataSource dataSource2) {        Map targetDataSource = new HashMap<>();        targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1);        targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2);        DynamicDataSource dataSource = new DynamicDataSource();        dataSource.setTargetDataSources(targetDataSource);        dataSource.setDefaultTargetDataSource(dataSource1);        return dataSource;    }            /**     * 将動态資料源注入到SqlSessionFactory     * @param dynamicDataSource     * @return     * @throws Exception     */    @Bean(name = "SqlSessionFactory")    public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource)            throws Exception {        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();        bean.setDataSource(dynamicDataSource);        bean.setMapperLocations(                new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml"));        bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity");        return bean.getObject();    }}
           

3、設定路由鍵

上面的配置都完成之後,我們還需要想辦法動态的改變資料源的鍵值,這個就跟系統的業務相關了。

比如在這裡,我們有兩個Mapper接口,建立訂單和扣減庫存。

public interface OrderMapper {    void createOrder(Order order);}public interface StorageMapper {    void decreaseStorage(Order order);
           

那麼,我們就可以搞一個切面,在執行訂單的操作時,切到資料源ds1,執行庫存操作時,切到資料源ds2。

@[email protected] class DataSourceAop {    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))")    public void setDataSource1() {        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);    }    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))")    public void setDataSource2() {        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2);    }}
           

4、測試

現在就可以寫一個Service方法,通過REST接口測試一下啦。

public class OrderServiceImpl implements OrderService {    @Override    public void createOrder(Order order) {        storageMapper.decreaseStorage(order);        logger.info("庫存已扣減,商品代碼:{},購買數量:{}。建立訂單中...",order.getCommodityCode(),order.getCount());        orderMapper.createOrder(order);    }}
           

不出意外的話,業務執行完成後,兩個資料庫的表都已經有了變化。

但此時,我們會想到,這兩個操作是需要保證原子性的。是以,我們需要依賴事務,在Service方法上标注Transactional。

如果我們在createOrder方法上添加了Transactional注解,然後在運作代碼,就會抛出異常。

### Cause: java.sql.SQLSyntaxErrorException: Table 'db2.t_order' doesn't exist; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException:     Table 'db2.t_order' doesn't exist] with root cause
           

這就說明,如果加上了 Spring 的事務,我們的資料源切換不過去了。這又是咋回事呢?

二、事務模式,為啥不能切換資料源

要想搞清楚原因,我們就得來分析分析如果加上了Spring事務,它又幹了哪些事情呢 ?

我們知道,Spring的自動事務是基于AOP實作的。在調用包含事務的方法時,會進入一個攔截器。

public class TransactionInterceptor{    public Object invoke(MethodInvocation invocation) throws Throwable {        //擷取目标類        Class> targetClass = AopUtils.getTargetClass(invocation.getThis());        //事務調用        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);    }}
           

1、建立事務

在這裡面呢,首先就是開始建立一個事務。

protected Object doGetTransaction() {    //DataSource的事務對象    DataSourceTransactionObject txObject = new DataSourceTransactionObject();    //設定事務自動儲存    txObject.setSavepointAllowed(isNestedTransactionAllowed());    //給事務對象設定ConnectionHolder    ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource());    txObject.setConnectionHolder(conHolder, false);    return txObject;}
           

在這一步,重點是給事務對象設定了ConnectionHolder屬性,不過此時還是為空。

2、開啟事務

接下來,就是開啟一個事務,這裡主要是通過ThreadLocal将資源和目前的事務對象綁定,然後設定一些事務狀态。

protected void doBegin(Object txObject, TransactionDefinition definition) {        Connection con = null;    //從資料源中擷取一個連接配接    Connection newCon = obtainDataSource().getConnection();    //重新設定事務對象中的connectionHolder,此時已經引用了一個連接配接    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);    //将這個connectionHolder标記為與事務同步    txObject.getConnectionHolder().setSynchronizedWithTransaction(true);    con = txObject.getConnectionHolder().getConnection();    con.setAutoCommit(false);    //激活事務活動狀态    txObject.getConnectionHolder().setTransactionActive(true);    //将connection holder綁定到目前線程,通過threadlocal    if (txObject.isNewConnectionHolder()) {        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());    }    //事務管理器,激活事務同步狀态    TransactionSynchronizationManager.initSynchronization();}
           

3、執行Mapper接口

開啟事務之後,就開始執行目标類真實方法。在這裡,就會開始進入Mybatis的代理對象。。哈哈,架構嘛,就各種代理。

我們知道,Mybatis在執行SQL的之前,需要先擷取到SqlSession對象。

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,                PersistenceExceptionTranslator exceptionTranslator) {    //從ThreadLocal中擷取SqlSessionHolder,第一次擷取不到為空    SqlSessionHolder holder = TransactionSynchronizationManager.getResource(sessionFactory);        //如果SqlSessionHolder為空,那也肯定擷取不到SqlSession;    //如果SqlSessionHolder不為空,直接通過它來拿到SqlSession    SqlSession session = sessionHolder(executorType, holder);    if (session != null) {        return session;    }    //建立一個新的SqlSession    session = sessionFactory.openSession(executorType);    //如果目前線程的事務處于激活狀态,就将SqlSessionHolder綁定到ThreadLocal    registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);    return session;}
           

拿到SqlSession之後,就開始調用Mybatis的執行器,準備執行SQL語句。在執行SQL之前呢,當然需要先拿到Connection連接配接。

public Connection getConnection() throws SQLException {    //通過資料源擷取連接配接    //比如我們配置了多資料源,此時還會正常切換    if (this.connection == null) {        openConnection();    }    return this.connection;}
           

我們看openConnection方法,它的作用就是從資料源中擷取一個Connection連接配接。如果我們配置了多資料源,此時是可以正常切換的。如果加了事務,之是以沒有切換資料源,是因為第二次調用時,this.connection != null,傳回的還是上一次的連接配接。

這是因為,在第二次擷取SqlSession的時候,目前線程是從ThreadLocal中拿到的,是以不會重複擷取Connection連接配接。

至此,在多資料源情況下,如果加了Spring事務,不能動态切換資料源的原因,我們應該都明白了。

在這裡,筆者插播一道面試題:

  • Spring是如何保證事務的?

那就是将多個業務操作,放到同一個資料庫連接配接中,一起送出或復原。

  • 怎麼做到,都在一個連接配接中呢?

這裡就是各種ThreadlLocal的運用,想辦法将資料庫資源和目前事務綁定到一起。

三、事務模式,怎麼支援切換資料源

上面我們已經把原因搞清楚了,接下來就看怎麼支援它動态切換資料源。

其他配置都不變的情況下,我們需要建立兩個不同的sqlSessionFactory。

@Bean(name = "sqlSessionFactory1")public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){    return createSqlSessionFactory(dataSource);}@Bean(name = "sqlSessionFactory2")public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){    return createSqlSessionFactory(dataSource);}
           

然後自定義一個CustomSqlSessionTemplate,來代替Mybatis中原有的sqlSessionTemplate,把上面定義的兩個SqlSessionFactory注入進去。

@Bean(name = "sqlSessionTemplate")public CustomSqlSessionTemplate sqlSessionTemplate(){    Map sqlSessionFactoryMap = new HashMap<>();    sqlSessionFactoryMap.put("ds1",factory1);    sqlSessionFactoryMap.put("ds2",factory2);    CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1);    customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);    customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1);    return customSqlSessionTemplate;
           

在定義的CustomSqlSessionTemplate中,其他都一樣,主要看擷取SqlSessionFactory的方法。

public class CustomSqlSessionTemplate extends SqlSessionTemplate {    @Override    public SqlSessionFactory getSqlSessionFactory() {        //目前資料源的名稱        String currentDsName = DataSourceType.getDataBaseType().name();        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName);        if (targetSqlSessionFactory != null) {            return targetSqlSessionFactory;        } else if (defaultTargetSqlSessionFactory != null) {            return defaultTargetSqlSessionFactory;        }        return this.sqlSessionFactory;    }}
           

在這裡,重點就是我們可以根據不同的資料源擷取不同的SqlSessionFactory。如果SqlSessionFactory不一樣,那麼在擷取SqlSession的時候,就不會在ThreadLocal中拿到,進而每次都是新的SqlSession對象。

既然SqlSession也不一樣,那麼在擷取Connection連接配接的時候,每次都會去動态資料源中去擷取。

原理就是這麼個原理,我們來走一把。

修改完配置之後,我們把Service方法加上事務的注解,此時資料也是可以正常更新的。

@[email protected] void createOrder(Order order) {    storageMapper.decreaseStorage(order);    orderMapper.createOrder(order);}
           

可以切換資料源隻是第一步,我們需要的保證可以保證事務操作。假如在上面的代碼中,庫存扣減完成,但是建立訂單失敗,庫存是不會復原的。因為它們分别屬于不同的資料源,根本不是同一個連接配接。

四、XA協定分布式事務

要解決上面那個問題,我們隻能考慮XA協定。

關于XA協定是啥,筆者不再過多的描述。我們隻需知道,MySQL InnoDB存儲引擎是支援XA事務的。

那麼XA協定的實作,在Java中叫做Java Transaction Manager,簡稱JTA。

如何實作JTA呢?我們借助Atomikos架構,先引入它的依賴。

org.springframework.boot    spring-boot-starter-jta-atomikos    2.2.7.RELEASE
           

然後,隻需把DataSource對象改成AtomikosDataSourceBean。

public DataSource getDataSource(Environment env, String prefix, String dataSourceName){    Properties prop = build(env,prefix);    AtomikosDataSourceBean ds = new AtomikosDataSourceBean();    ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());    ds.setUniqueResourceName(dataSourceName);    ds.setXaProperties(prop);    return ds;}
           

這樣配完之後,擷取Connection連接配接的時候,拿到的其實是MysqlXAConnection對象。在送出或者復原的時候,走的就是MySQL的XA協定了。

public void commit(Xid xid, boolean onePhase) throws XAException {    //封裝 XA COMMIT 請求    StringBuilder commandBuf = new StringBuilder(300);    commandBuf.append("XA COMMIT ");    appendXid(commandBuf, xid);    try {        //交給MySQL執行XA事務操作        dispatchCommand(commandBuf.toString());    } finally {        this.underlyingConnection.setInGlobalTx(false);    }}
           

通過引入Atomikos和修改DataSource,在多資料源情況下,即便業務操作中間發生錯誤,多個資料庫也是可以正常復原的。

另外一個問題,是否應該使用XA協定?

XA協定看起來看起來比較簡單,但它也有一些缺點。比如:

  • 性能問題,所有參與者在事務送出階段處于同步阻塞狀态,占用系統資源,容易導緻性能瓶頸,無法滿足高并發場景;
  • 如果協調者存在單點故障問題,如果協調者出現故障,參與者将一直處于鎖定狀态;
  • 主從複制可能産生事務狀态不一緻。

在MySQL官方文檔中也列舉了一些XA協定的限制項:

https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html

另外,筆者在實際的項目裡,其實也沒有用過,通過這樣的方式來解決分布式事務問題,此例僅做可行性方案探讨。

總結

本文通過引入SpringBoot+Mybatis的多資料源場景,分析了如下問題:

  • 多資料源的配置和實作;
  • Spring事務模式,多資料源不生效的原因和解決方法;
  • 多資料源,基于XA協定的分布式事務實作。

作者:清幽之地

連結:https://juejin.im/post/5eba38aa6fb9a043777c9b3a

繼續閱讀