前言
可能由于業務上的某些需求,我們的系統中有時往往要連接配接多個資料庫,這就産生了多資料源問題。
多資料源的情況下,一般我們要做到可以自動切換,此時會涉及到事務注解 Transactional 不生效問題和分布式事務問題。
關于多資料源方案,筆者在網上看過一些例子,然而大部分都是錯誤示例,根本跑不通,或者沒辦法相容事務。
今天,我們就一點點來分析這些問題産生的根源和相應的解決方法。
一、多資料源
為了劇情的順利開展,我們模拟的業務是建立訂單和扣減庫存。
是以,我們先建立訂單表和庫存表。注意,把他們分别放到兩個資料庫中。
CREATE TABLE `t_storage` ( `id` int(11) NOT NULL AUTO_INCREMENT, `commodity_code` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT '0', PRIMARY KEY (`id`), UNIQUE KEY `commodity_code` (`commodity_code`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;CREATE TABLE `t_order` ( `id` bigint(16) NOT NULL, `commodity_code` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT '0', `amount` double(14,2) DEFAULT '0.00', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1、資料庫連接配接
通過YML檔案先把兩個資料庫都配置一下。
spring: datasource: ds1: jdbc_url: jdbc:mysql://127.0.0.1:3306/db1 username: root password: root ds2: jdbc_url: jdbc:mysql://127.0.0.1:3306/db2 username: root password: root
2、配置DataSource
我們知道,Mybatis執行一條SQL語句的時候,需要先擷取一個Connection。這時候,就交由Spring管理器到DataSource中擷取連接配接。
Spring中有個具有路由功能的DataSource,它可以通過查找鍵調用不同的資料源,這就是AbstractRoutingDataSource。
public abstract class AbstractRoutingDataSource{ //資料源的集合 @Nullable private Map targetDataSources; //預設的資料源 @Nullable private Object defaultTargetDataSource; //傳回目前的路由鍵,根據該值傳回不同的資料源 @Nullable protected abstract Object determineCurrentLookupKey(); //确定一個資料源 protected DataSource determineTargetDataSource() { //抽象方法 傳回一個路由鍵 Object lookupKey = determineCurrentLookupKey(); DataSource dataSource = this.targetDataSources.get(lookupKey); return dataSource; }}
可以看到,該抽象類的核心就是先設定多個資料源到Map集合中,然後根據Key可以擷取不同的資料源。
那麼,我們就可以重寫這個determineCurrentLookupKey方法,它傳回的是一個資料源的名稱。
public class DynamicDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType(); return dataBaseType; }}
然後還需要一個工具類,來儲存目前線程的資料源類型。
public class DataSourceType { public enum DataBaseType { ds1, ds2 } // 使用ThreadLocal保證線程安全 private static final ThreadLocal TYPE = new ThreadLocal(); // 往目前線程裡設定資料源類型 public static void setDataBaseType(DataBaseType dataBaseType) { if (dataBaseType == null) { throw new NullPointerException(); } TYPE.set(dataBaseType); } // 擷取資料源類型 public static DataBaseType getDataBaseType() { DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get(); return dataBaseType; }}
這些都搞定之後,我們還需要把這個DataSource配置到Spring容器中去。下面這個配置類的作用如下:
- 建立多個資料源DataSource,ds1 和 ds2;
- 将ds1 和 ds2 資料源放入動态資料源DynamicDataSource;
- 将DynamicDataSource注入到SqlSessionFactory。
@Configurationpublic class DataSourceConfig { /** * 建立多個資料源 ds1 和 ds2 * 此處的Primary,是設定一個Bean的優先級 * @return */ @Primary @Bean(name = "ds1") @ConfigurationProperties(prefix = "spring.datasource.ds1") public DataSource getDateSource1() { return DataSourceBuilder.create().build(); } @Bean(name = "ds2") @ConfigurationProperties(prefix = "spring.datasource.ds2") public DataSource getDateSource2() { return DataSourceBuilder.create().build(); } /** * 将多個資料源注入到DynamicDataSource * @param dataSource1 * @param dataSource2 * @return */ @Bean(name = "dynamicDataSource") public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1, @Qualifier("ds2") DataSource dataSource2) { Map targetDataSource = new HashMap<>(); targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1); targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2); DynamicDataSource dataSource = new DynamicDataSource(); dataSource.setTargetDataSources(targetDataSource); dataSource.setDefaultTargetDataSource(dataSource1); return dataSource; } /** * 将動态資料源注入到SqlSessionFactory * @param dynamicDataSource * @return * @throws Exception */ @Bean(name = "SqlSessionFactory") public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource) throws Exception { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dynamicDataSource); bean.setMapperLocations( new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml")); bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity"); return bean.getObject(); }}
3、設定路由鍵
上面的配置都完成之後,我們還需要想辦法動态的改變資料源的鍵值,這個就跟系統的業務相關了。
比如在這裡,我們有兩個Mapper接口,建立訂單和扣減庫存。
public interface OrderMapper { void createOrder(Order order);}public interface StorageMapper { void decreaseStorage(Order order);
那麼,我們就可以搞一個切面,在執行訂單的操作時,切到資料源ds1,執行庫存操作時,切到資料源ds2。
@[email protected] class DataSourceAop { @Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))") public void setDataSource1() { DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1); } @Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))") public void setDataSource2() { DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2); }}
4、測試
現在就可以寫一個Service方法,通過REST接口測試一下啦。
public class OrderServiceImpl implements OrderService { @Override public void createOrder(Order order) { storageMapper.decreaseStorage(order); logger.info("庫存已扣減,商品代碼:{},購買數量:{}。建立訂單中...",order.getCommodityCode(),order.getCount()); orderMapper.createOrder(order); }}
不出意外的話,業務執行完成後,兩個資料庫的表都已經有了變化。
但此時,我們會想到,這兩個操作是需要保證原子性的。是以,我們需要依賴事務,在Service方法上标注Transactional。
如果我們在createOrder方法上添加了Transactional注解,然後在運作代碼,就會抛出異常。
### Cause: java.sql.SQLSyntaxErrorException: Table 'db2.t_order' doesn't exist; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException: Table 'db2.t_order' doesn't exist] with root cause
這就說明,如果加上了 Spring 的事務,我們的資料源切換不過去了。這又是咋回事呢?
二、事務模式,為啥不能切換資料源
要想搞清楚原因,我們就得來分析分析如果加上了Spring事務,它又幹了哪些事情呢 ?
我們知道,Spring的自動事務是基于AOP實作的。在調用包含事務的方法時,會進入一個攔截器。
public class TransactionInterceptor{ public Object invoke(MethodInvocation invocation) throws Throwable { //擷取目标類 Class> targetClass = AopUtils.getTargetClass(invocation.getThis()); //事務調用 return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }}
1、建立事務
在這裡面呢,首先就是開始建立一個事務。
protected Object doGetTransaction() { //DataSource的事務對象 DataSourceTransactionObject txObject = new DataSourceTransactionObject(); //設定事務自動儲存 txObject.setSavepointAllowed(isNestedTransactionAllowed()); //給事務對象設定ConnectionHolder ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject;}
在這一步,重點是給事務對象設定了ConnectionHolder屬性,不過此時還是為空。
2、開啟事務
接下來,就是開啟一個事務,這裡主要是通過ThreadLocal将資源和目前的事務對象綁定,然後設定一些事務狀态。
protected void doBegin(Object txObject, TransactionDefinition definition) { Connection con = null; //從資料源中擷取一個連接配接 Connection newCon = obtainDataSource().getConnection(); //重新設定事務對象中的connectionHolder,此時已經引用了一個連接配接 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); //将這個connectionHolder标記為與事務同步 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); con.setAutoCommit(false); //激活事務活動狀态 txObject.getConnectionHolder().setTransactionActive(true); //将connection holder綁定到目前線程,通過threadlocal if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } //事務管理器,激活事務同步狀态 TransactionSynchronizationManager.initSynchronization();}
3、執行Mapper接口
開啟事務之後,就開始執行目标類真實方法。在這裡,就會開始進入Mybatis的代理對象。。哈哈,架構嘛,就各種代理。
我們知道,Mybatis在執行SQL的之前,需要先擷取到SqlSession對象。
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { //從ThreadLocal中擷取SqlSessionHolder,第一次擷取不到為空 SqlSessionHolder holder = TransactionSynchronizationManager.getResource(sessionFactory); //如果SqlSessionHolder為空,那也肯定擷取不到SqlSession; //如果SqlSessionHolder不為空,直接通過它來拿到SqlSession SqlSession session = sessionHolder(executorType, holder); if (session != null) { return session; } //建立一個新的SqlSession session = sessionFactory.openSession(executorType); //如果目前線程的事務處于激活狀态,就将SqlSessionHolder綁定到ThreadLocal registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session); return session;}
拿到SqlSession之後,就開始調用Mybatis的執行器,準備執行SQL語句。在執行SQL之前呢,當然需要先拿到Connection連接配接。
public Connection getConnection() throws SQLException { //通過資料源擷取連接配接 //比如我們配置了多資料源,此時還會正常切換 if (this.connection == null) { openConnection(); } return this.connection;}
我們看openConnection方法,它的作用就是從資料源中擷取一個Connection連接配接。如果我們配置了多資料源,此時是可以正常切換的。如果加了事務,之是以沒有切換資料源,是因為第二次調用時,this.connection != null,傳回的還是上一次的連接配接。
這是因為,在第二次擷取SqlSession的時候,目前線程是從ThreadLocal中拿到的,是以不會重複擷取Connection連接配接。
至此,在多資料源情況下,如果加了Spring事務,不能動态切換資料源的原因,我們應該都明白了。
在這裡,筆者插播一道面試題:
- Spring是如何保證事務的?
那就是将多個業務操作,放到同一個資料庫連接配接中,一起送出或復原。
- 怎麼做到,都在一個連接配接中呢?
這裡就是各種ThreadlLocal的運用,想辦法将資料庫資源和目前事務綁定到一起。
三、事務模式,怎麼支援切換資料源
上面我們已經把原因搞清楚了,接下來就看怎麼支援它動态切換資料源。
其他配置都不變的情況下,我們需要建立兩個不同的sqlSessionFactory。
@Bean(name = "sqlSessionFactory1")public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){ return createSqlSessionFactory(dataSource);}@Bean(name = "sqlSessionFactory2")public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){ return createSqlSessionFactory(dataSource);}
然後自定義一個CustomSqlSessionTemplate,來代替Mybatis中原有的sqlSessionTemplate,把上面定義的兩個SqlSessionFactory注入進去。
@Bean(name = "sqlSessionTemplate")public CustomSqlSessionTemplate sqlSessionTemplate(){ Map sqlSessionFactoryMap = new HashMap<>(); sqlSessionFactoryMap.put("ds1",factory1); sqlSessionFactoryMap.put("ds2",factory2); CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1); customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap); customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1); return customSqlSessionTemplate;
在定義的CustomSqlSessionTemplate中,其他都一樣,主要看擷取SqlSessionFactory的方法。
public class CustomSqlSessionTemplate extends SqlSessionTemplate { @Override public SqlSessionFactory getSqlSessionFactory() { //目前資料源的名稱 String currentDsName = DataSourceType.getDataBaseType().name(); SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName); if (targetSqlSessionFactory != null) { return targetSqlSessionFactory; } else if (defaultTargetSqlSessionFactory != null) { return defaultTargetSqlSessionFactory; } return this.sqlSessionFactory; }}
在這裡,重點就是我們可以根據不同的資料源擷取不同的SqlSessionFactory。如果SqlSessionFactory不一樣,那麼在擷取SqlSession的時候,就不會在ThreadLocal中拿到,進而每次都是新的SqlSession對象。
既然SqlSession也不一樣,那麼在擷取Connection連接配接的時候,每次都會去動态資料源中去擷取。
原理就是這麼個原理,我們來走一把。
修改完配置之後,我們把Service方法加上事務的注解,此時資料也是可以正常更新的。
@[email protected] void createOrder(Order order) { storageMapper.decreaseStorage(order); orderMapper.createOrder(order);}
可以切換資料源隻是第一步,我們需要的保證可以保證事務操作。假如在上面的代碼中,庫存扣減完成,但是建立訂單失敗,庫存是不會復原的。因為它們分别屬于不同的資料源,根本不是同一個連接配接。
四、XA協定分布式事務
要解決上面那個問題,我們隻能考慮XA協定。
關于XA協定是啥,筆者不再過多的描述。我們隻需知道,MySQL InnoDB存儲引擎是支援XA事務的。
那麼XA協定的實作,在Java中叫做Java Transaction Manager,簡稱JTA。
如何實作JTA呢?我們借助Atomikos架構,先引入它的依賴。
org.springframework.boot spring-boot-starter-jta-atomikos 2.2.7.RELEASE
然後,隻需把DataSource對象改成AtomikosDataSourceBean。
public DataSource getDataSource(Environment env, String prefix, String dataSourceName){ Properties prop = build(env,prefix); AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setXaDataSourceClassName(MysqlXADataSource.class.getName()); ds.setUniqueResourceName(dataSourceName); ds.setXaProperties(prop); return ds;}
這樣配完之後,擷取Connection連接配接的時候,拿到的其實是MysqlXAConnection對象。在送出或者復原的時候,走的就是MySQL的XA協定了。
public void commit(Xid xid, boolean onePhase) throws XAException { //封裝 XA COMMIT 請求 StringBuilder commandBuf = new StringBuilder(300); commandBuf.append("XA COMMIT "); appendXid(commandBuf, xid); try { //交給MySQL執行XA事務操作 dispatchCommand(commandBuf.toString()); } finally { this.underlyingConnection.setInGlobalTx(false); }}
通過引入Atomikos和修改DataSource,在多資料源情況下,即便業務操作中間發生錯誤,多個資料庫也是可以正常復原的。
另外一個問題,是否應該使用XA協定?
XA協定看起來看起來比較簡單,但它也有一些缺點。比如:
- 性能問題,所有參與者在事務送出階段處于同步阻塞狀态,占用系統資源,容易導緻性能瓶頸,無法滿足高并發場景;
- 如果協調者存在單點故障問題,如果協調者出現故障,參與者将一直處于鎖定狀态;
- 主從複制可能産生事務狀态不一緻。
在MySQL官方文檔中也列舉了一些XA協定的限制項:
https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html
另外,筆者在實際的項目裡,其實也沒有用過,通過這樣的方式來解決分布式事務問題,此例僅做可行性方案探讨。
總結
本文通過引入SpringBoot+Mybatis的多資料源場景,分析了如下問題:
- 多資料源的配置和實作;
- Spring事務模式,多資料源不生效的原因和解決方法;
- 多資料源,基于XA協定的分布式事務實作。
作者:清幽之地
連結:https://juejin.im/post/5eba38aa6fb9a043777c9b3a