天天看點

【133】Spring Boot 1 + MyBatis 多資料源分布式事務(一)

本文源代码位置:https://gitee.com/zhangchao19890805/csdnBlog.git 仓库中的 blog133 文件夹就是项目文件夹。

使用 Spring Boot 和 Spring Cloud 做分布式微服务系统,难免会碰到跨数据库的事务。众所周知的CAP原则,即一致性(C)、可用性(A)和分区容错性(P)只能做到其中两个比较强,剩下一个较弱。Spring Cloud 分布式微服务系统天生可用性(A)和分区容错性(P)较强。如何保证一致性就是个重要的问题。我准备就这个问题写多篇文章,结合实例,向读者一步一步展示如何实现分布式事务,保证数据的一致性。我的文章会从最简单的方法开始,一步一步的不断完善系统,逐渐增强系统的一致性。这也恰好符合我在实践中经历。

首先,我先模拟一个场景:我们在添加用户的时候,既要添加用户的基本信息,又要添加用户的身份证信息。这两个信息储存在两台物理机的数据库中。在添加用户的时候,要保证身份证也一起添加,这构成了一个事务。启用用户基本信息存在 t_user 表中,身份证信息存在 t_card 表中。t_user 和 t_card 一一对应。

下面是数据结构:

储存t_user的物理机

CREATE DATABASE `db_test`

CREATE TABLE `t_user` (
  `c_id` varchar(70) CHARACTER SET utf8 NOT NULL,
  `c_user_name` varchar(45) CHARACTER SET utf8 NOT NULL,
  `c_password` varchar(45) CHARACTER SET utf8 NOT NULL,
  `c_create_time` datetime NOT NULL,
  `c_balance` decimal(9,2) NOT NULL DEFAULT '0.00',
  PRIMARY KEY (`c_id`),
  UNIQUE KEY `c_user_name_UNIQUE` (`c_user_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `t_log` (
  `c_id` varchar(80) NOT NULL,
  `c_content` text,
  `c_datetime` datetime NOT NULL,
  PRIMARY KEY (`c_id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;
           

储存 t_card 的物理机

CREATE DATABASE `db_home_2` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */;
CREATE TABLE `t_card` (
  `c_id` varchar(80) NOT NULL,
  `c_user_id` varchar(80) NOT NULL,
  `c_no` varchar(30) NOT NULL,
  `c_create_time` datetime NOT NULL,
  PRIMARY KEY (`c_id`),
  UNIQUE KEY `c_user_id_UNIQUE` (`c_user_id`),
  UNIQUE KEY `c_no_UNIQUE` (`c_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
           

接下来是关键代码。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>zhangchao</groupId>
	<artifactId>blog133</artifactId>
	<version>0.0.1</version>
	
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.17.RELEASE</version>
		<relativePath/>
	</parent>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>
		<dependency>
		    <groupId>mysql</groupId>
		    <artifactId>mysql-connector-java</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>		
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger-ui</artifactId>
			<version>2.6.1</version>
		</dependency>
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger2</artifactId>
			<version>2.6.1</version>
		</dependency>
		<!-- 热部署模块 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<optional>true</optional> <!-- 这个需要为 true 热部署才有效 -->
		</dependency>


		<dependency>
			<groupId>org.mybatis</groupId>
			<artifactId>mybatis</artifactId>
			<version>3.4.6</version>
		</dependency>
		
	</dependencies>
	
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
           

必须要屏蔽掉 Spring Boot 1 默认的数据源配置,否则你只要没在 application.properties 或 application.yml 里配置数据源,启动时就会报错。需要在 main 方法中屏蔽。

Blog133Application.java

package zhangchao;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@SpringBootApplication(exclude = {
		DataSourceAutoConfiguration.class
})
public class Blog133Application {
	public static void main(String[] args) {
        SpringApplication.run(Blog133Application.class, args);
    }
}
           

两个数据源的信息分别写到 mybatis-config.xml 和 mybatis-config-2.xml 两个文件中。

mybatis-config.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
  PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
  "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
  <environments default="development">
    <environment id="development">
      <transactionManager type="JDBC"/>
      <dataSource type="POOLED">
        <property name="driver" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://127.0.0.1:3306/db_test"/>
        <property name="username" value="root"/>
        <property name="password" value="123456"/>
      </dataSource>
    </environment>
  </environments>
  <mappers>
    <mapper resource="mapper/UserMapper.xml"/>
    <mapper resource="mapper/LogMapper.xml"/>
  </mappers>
</configuration>
           

mybatis-config-2.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
  PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
  "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
  <environments default="development">
    <environment id="development">
      <transactionManager type="JDBC"/>
      <dataSource type="POOLED">
        <property name="driver" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://192.168.1.230:3306/db_home_2"/>
        <property name="username" value="root"/>
        <property name="password" value="123456"/>
      </dataSource>
    </environment>
  </environments>
  <mappers>
    <mapper resource="mapper/CardMapper.xml"/>
  </mappers>
</configuration>
           

为两个数据源配置两个不同的 SqlSessionFactory。

FirstDBFactory.java

package zhangchao.common.db;

import java.io.IOException;
import java.io.InputStream;

import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;

/**
 * 保存第一个数据库的SqlSessionFactory
 * @author 张超
 */
public class FirstDBFactory {
	private static SqlSessionFactory sqlSessionFactory = null;
	
	static {
		String resource = "mybatis-config.xml";
		InputStream inputStream = null;
		try {
			inputStream = Resources.getResourceAsStream(resource);
		} catch (IOException e) {
			e.printStackTrace();
		}
		sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
	}
	
	public static SqlSessionFactory getInstance(){
		return sqlSessionFactory;
	}
}

           

SecondDBFactory.java

package zhangchao.common.db;

import java.io.IOException;
import java.io.InputStream;

import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;

/**
 * 保存第二个数据库的SqlSessionFactory
 * @author 张超
 *
 */
public class SecondDBFactory {
	private static SqlSessionFactory sqlSessionFactory = null;
	
	static {
		String resource = "mybatis-config-2.xml";
		InputStream inputStream = null;
		try {
			inputStream = Resources.getResourceAsStream(resource);
		} catch (IOException e) {
			e.printStackTrace();
		}
		sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
	}
	
	public static SqlSessionFactory getInstance(){
		return sqlSessionFactory;
	}
}
           

我把事务放到 Service 层中,基本思路是先打开两个数据库的SqlSession,然后执行 SQL 语句,等执行完后再提交两个数据库的事务。如果有异常,先判断事务有没有提交:没提交就是常规的回滚;已经提交了的就执行补偿操作。

与此同时,对异常的处理还做了如下加强:如果恰巧在前一个数据库回滚或者补偿操作的时候出现数据库宕机,那么回滚或者补偿操作就会抛出异常,造成程序没有执行下一个数据库的回滚或者补偿操作。为了避免上述情况,还特意为回滚和补偿操作加了 try … catch ,并增加了合并异常的代码。下面是最关键的部分:

UserService.java

package zhangchao.service;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import zhangchao.common.db.FirstDBFactory;
import zhangchao.common.db.SecondDBFactory;
import zhangchao.common.exception.ExceptionUtils;
import zhangchao.dao.CardDao;
import zhangchao.dao.UserDao;
import zhangchao.domain.*;


/**
 * 用户的服务类
 * @author 张超
 *
 */
@Service
public class UserService {
	
	@Autowired
	private UserDao userDao;
	@Autowired
	private CardDao cardDao;


	public List<User> selectList(){
		SqlSession sqlSession = FirstDBFactory.getInstance().openSession(true);
		List<User> r = null;
		try {
			r = this.userDao.selectList(sqlSession);
		} finally{
			sqlSession.close();
		}
		return r;
	}
	
	/**
	 * 删除用户
	 * @param id
	 */
	public void delete(String id) {
		SqlSession sqlSession = FirstDBFactory.getInstance().openSession(true);
		try {
			this.userDao.delete(sqlSession, id);
		} finally{
			sqlSession.close();
		}
	}
	
	public void save(User user, Card card) {
		// 第一个数据库,放User表
		SqlSession sqlSession_1 = FirstDBFactory.getInstance().openSession();
		// 第二个数据库,放Card表
		SqlSession sqlSession_2 = SecondDBFactory.getInstance().openSession();
		boolean firstSessionCommit = false;
		try {
			this.userDao.save(sqlSession_1, user);
			this.cardDao.save(sqlSession_2, card);
			new BigDecimal("11").divide(user.getBalance(),2);
			
			// 提交
			sqlSession_1.commit();
			firstSessionCommit = true;
			new BigDecimal("11").divide(new BigDecimal(card.getNo()), 2);
			sqlSession_2.commit();
		} catch (Exception e) {
			throw ExceptionUtils.catchException(e, 
					sqlSession_1, firstSessionCommit, ()->{
						this.delete(user.getId());
						System.out.println("调用UserService.delete方法");
					}, 
					sqlSession_2);
		} finally {
			sqlSession_1.close();
			sqlSession_2.close();
		}
	}

}
           

ExceptionUtils.java

package zhangchao.common.exception;

import org.apache.ibatis.session.SqlSession;

/**
 * 统一的异常处理
 * @author 张超
 *
 */
public class ExceptionUtils {
	
	/**
	 * 异常转成字符串
	 * @param t 异常
	 * @return 异常的详细信息的字符串
	 */
	private static String throwable2Str (Throwable t) {
		StackTraceElement[] steArr = t.getStackTrace();
		String[] details = new String[steArr.length];
		for (int i = 0; i < details.length; i++) {
			details[i] = steArr[i].toString();
		}
		String message = t.getMessage();
		StringBuilder content = new StringBuilder();
		for (String str : details) {
			content.append(str).append("\n");
		}
		content.append("\n").append(message).append("\n");
		return content.toString();
	}
	
	/**
	 * 把多个异常合并成一个异常。
	 * @param arr
	 */
	public static RuntimeException join(Throwable[] arr){
		StringBuilder sb = new StringBuilder();
		if (null == arr || arr.length == 0) {
			return null;
		}
		for (int i = 0; i < arr.length; i++) {
			Throwable t = arr[i];
			if (null != t) {
				sb.append(throwable2Str(t)).append("\n");
			}
		}
		return new RuntimeException(sb.toString());
	}
	
	/**
	 * 统一处理异常
	 * @param e
	 */
	public static RuntimeException catchException(Exception e, 
			SqlSession sqlSession_1, boolean sessionCommit_1, Compensate c1,
			SqlSession sqlSession_2, boolean sessionCommit_2, Compensate c2){
		Exception rbEx_1 = null;
		Exception rbEx_2 = null;
		try{
			// 如果User表已经提交,做补偿操作
			if (sessionCommit_1){
				if (null != c1) {
					c1.compensate();
				}
			} else {
				// 如果User表还没提交,回滚
				sqlSession_1.rollback();
				System.out.println("sqlSession.rollback()");
			}
		} catch (Exception ex1) {
			// 这里加入 try ... catch 的原因是,如果 user 表的
			// 数据库停止,上面代码 抛出异常,如果不处理,
			// 后面的代码就无法执行,也就无法回退 sqlSession_2 和包装异常。
			rbEx_1 = ex1;
		}
		try {
			if (sessionCommit_2) {
				if (null != c2) {
					c2.compensate();
				}
			} else {
				sqlSession_2.rollback();
				System.out.println("sqlSession_2.rollback()");
			}
		} catch(Exception ex2) {
			rbEx_2 = ex2;
		}
		RuntimeException r = ExceptionUtils.join(new Exception[]{e, rbEx_1, rbEx_2});
		return r;
	}
	
	public static RuntimeException catchException(Exception e, 
			SqlSession sqlSession_1, boolean sessionCommit_1, Compensate c1,
			SqlSession sqlSession_2){
		RuntimeException r = catchException(e, sqlSession_1, sessionCommit_1, c1,
				sqlSession_2, false, null);
		return r;
	}
}

           

为了方便补偿操作而加的接口 Compensate

package zhangchao.common.exception;
/**
 * 异常处理中的补偿操作接口
 * @author 张超
 *
 */
public interface Compensate {
	void compensate();
}

           

ExceptionHandleAdvice.java

package zhangchao.common.exception;

import java.sql.Timestamp;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;

import zhangchao.common.core.R;
import zhangchao.domain.Log;
import zhangchao.service.LogService;

/**
 * Spring Boot 1 统一封装异常
 * @author 张超
 *
 */
@RestControllerAdvice
public class ExceptionHandleAdvice {
	@Autowired
	private LogService logService;
	
	
	@ExceptionHandler(value=Throwable.class)
	public R exception(Throwable t){
		StackTraceElement[] steArr = t.getStackTrace();
		String[] details = new String[steArr.length];
		for (int i = 0; i < details.length; i++) {
			details[i] = steArr[i].toString();
		}
		String message = t.getMessage();
		StringBuilder content = new StringBuilder();
		for (String str : details) {
			content.append(str).append("\n");
		}
		content.append("\n").append(message).append("\n");
		
		Log log = new Log();
		log.setId(UUID.randomUUID().toString());
		log.setContent(content.toString());
		log.setDatetime(new Timestamp(System.currentTimeMillis()));
		// 这里加上try catch,就算数据库停止了,也能处理异常。
		try {
			logService.save(log);
		} catch (Exception e) {
			// 此处也可以改成存入日志文件中
			e.printStackTrace();
		}
		return R.error().put("msg", message);
	}
}

           

好了,最关键的部分就是这些。想查看更多细节建议克隆下代码再仔细查看。运行Blog133Application 后,浏览器中输入

http://localhost/test.html

就可以执行添加用户的操作。浏览器中输入

http://localhost/api/user

就能看的已经添加的用户。

反思我们的代码,虽然我们为了保证事务的一致性加入了回滚和补偿操作,但是系统的一致性仍然有瑕疵。假如系统执行完 t_user 表的数据库提交操作后,Tomcat 突然宕机,会造成 t_card 数据库没提交,导致两台机器的数据不一致。也就是 t_user 表已经有记录了,而 t_card 表却没有对应的记录。这个问题该怎么解决呢?我会在下一篇文章中讲到。

【134】Spring Boot 1 + MyBatis 多数据源分布式事务(二)

繼續閱讀