天天看点

分布式事务JTA实现Atomikos与Spring集成实践

  Atomikos官网无法访问,不过Maven中央库中具atomikos包。Atomikos集成Spring,Hibernate,Mybatis网上文章比较多,本文是通过JavaSE的方式借用Spring配置来测试Atomikos对JTA的实现。

 下面做一件事,就是两(+)个数据库,在一个事务里对其分别对数据库操作验证操作的原子性,即要么两个数据库的操作都成功,要么都失败。

 1.准备工作

  1.1 Maven pom.xml中添加依赖包

  atomikos:(目前最新版)

1

2

3

4

5

<code>&lt;</code><code>dependency</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>groupId</code><code>&gt;com.atomikos&lt;/</code><code>groupId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>artifactId</code><code>&gt;transactions-jdbc&lt;/</code><code>artifactId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>version</code><code>&gt;3.9.3&lt;/</code><code>version</code><code>&gt;</code>

<code>        </code><code>&lt;/</code><code>dependency</code><code>&gt;</code>

 jar依赖图:

 Postgresql数据库驱动:

<code>            </code><code>&lt;</code><code>groupId</code><code>&gt;org.postgresql&lt;/</code><code>groupId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>artifactId</code><code>&gt;postgresql&lt;/</code><code>artifactId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>version</code><code>&gt;9.2-1004-jdbc4&lt;/</code><code>version</code><code>&gt;</code>

 注:文中使用的是Postgresql V9.2

 javax.transaction.transaction-api.1.1:

6

7

8

<code>&lt;!-- Include in javaee-api --&gt;</code>

<code>        </code><code>&lt;!-- </code>

<code>        </code><code>&lt;dependency&gt;</code>

<code>            </code><code>&lt;groupId&gt;javax.transaction&lt;/groupId&gt;</code>

<code>            </code><code>&lt;artifactId&gt;transaction-api&lt;/artifactId&gt;</code>

<code>            </code><code>&lt;version&gt;1.1&lt;/version&gt;</code>

<code>        </code><code>&lt;/dependency&gt;</code>

<code>         </code><code>--&gt;</code>

<code>&lt;!-- JavaEE --&gt;</code>

<code>        </code><code>&lt;!-- javaee-api包含了JavaEE规范中的api,如servlet-api,persistence-api, transaction-api等 --&gt;</code>

<code>        </code><code>&lt;</code><code>dependency</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>groupId</code><code>&gt;javax&lt;/</code><code>groupId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>artifactId</code><code>&gt;javaee-api&lt;/</code><code>artifactId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>version</code><code>&gt;7.0&lt;/</code><code>version</code><code>&gt;</code>

 注:根据需要选择其一即可。

 Spring,Junit依赖这里省略。

 1.2 创建数据库以及表

   数据库分别是:javaee,tomdb

 2.在项目中添加配置文件

  spring-jta.xml 和transaction.properties(模版见文中附件)文件,spring-jta.xml在src/main/resources/integration下,transaction.properties在src/main/resources/下。

 2.1在spring-jta.xml中配置两个XADataSource:

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

<code>&lt;!-- 使用分布式事务时设置Postgresql的max_prepared_transactions为大于0的值,该值默认是0 --&gt;</code>

<code>    </code><code>&lt;!-- 数据库A --&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"a"</code> <code>class</code><code>=</code><code>"com.atomikos.jdbc.AtomikosDataSourceBean"</code>

<code>        </code><code>init-method</code><code>=</code><code>"init"</code> <code>destroy-method</code><code>=</code><code>"close"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"uniqueResourceName"</code> <code>value</code><code>=</code><code>"pg/a"</code> <code>/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"xaDataSourceClassName"</code> <code>value</code><code>=</code><code>"org.postgresql.xa.PGXADataSource"</code> <code>/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"xaProperties"</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>props</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"user"</code><code>&gt;postgres&lt;/</code><code>prop</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"password"</code><code>&gt;postgres&lt;/</code><code>prop</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"serverName"</code><code>&gt;localhost&lt;/</code><code>prop</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"portNumber"</code><code>&gt;5432&lt;/</code><code>prop</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"databaseName"</code><code>&gt;tomdb&lt;/</code><code>prop</code><code>&gt;</code>

<code>            </code><code>&lt;/</code><code>props</code><code>&gt;</code>

<code>        </code><code>&lt;/</code><code>property</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"poolSize"</code> <code>value</code><code>=</code><code>"10"</code> <code>/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"reapTimeout"</code> <code>value</code><code>=</code><code>"20000"</code> <code>/&gt;</code>

<code>    </code><code>&lt;/</code><code>bean</code><code>&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"b"</code> <code>class</code><code>=</code><code>"com.atomikos.jdbc.AtomikosDataSourceBean"</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"uniqueResourceName"</code> <code>value</code><code>=</code><code>"pg/b"</code> <code>/&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"databaseName"</code><code>&gt;javaee&lt;/</code><code>prop</code><code>&gt;</code>

 说明:

 Postgresql的max_prepared_transactions参数值默认是0,要开启XA需要设置该值至少和max_connections参数值一样大,该参数在PostgreSQL\9.3\data\postgresql.conf文件中。

 PGXADataSource的父类BaseDataSource没有url属性,可需要分别设置serverName,portNumber,databaseName等属性。不同的数据库驱动有不同的实现方法。

 2.2 配置事务管理对象和UserTransaction接口实现

<code>&lt;!-- atomikos事务管理 --&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"atomikosUserTransactionManager"</code> <code>class</code><code>=</code><code>"com.atomikos.icatch.jta.UserTransactionManager"</code>  <code>init-method</code><code>=</code><code>"init"</code> <code>destroy-method</code><code>=</code><code>"close"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>description</code><code>&gt;UserTransactionManager&lt;/</code><code>description</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"forceShutdown"</code> <code>value</code><code>=</code><code>"true"</code> <code>/&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"atomikosUserTransaction"</code> <code>class</code><code>=</code><code>"com.atomikos.icatch.jta.UserTransactionImp"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"transactionTimeout"</code> <code>value</code><code>=</code><code>"300"</code> <code>/&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"transactionManager"</code>

<code>        </code><code>class</code><code>=</code><code>"org.springframework.transaction.jta.JtaTransactionManager"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"transactionManager"</code> <code>ref</code><code>=</code><code>"atomikosUserTransactionManager"</code><code>&gt;&lt;/</code><code>property</code><code>&gt;</code>

 上面三个Bean可以独立使用来进行事务控制,具体看下面3。

3. 编写测试

 3.1 使用atomikosUserTransactionManager对象测试(TestAtomikos1.java)

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

<code>package</code> <code>secondriver.springsubway.example.jta;</code>

<code>import</code> <code>java.sql.Connection;</code>

<code>import</code> <code>java.sql.SQLException;</code>

<code>import</code> <code>javax.transaction.HeuristicMixedException;</code>

<code>import</code> <code>javax.transaction.HeuristicRollbackException;</code>

<code>import</code> <code>javax.transaction.NotSupportedException;</code>

<code>import</code> <code>javax.transaction.RollbackException;</code>

<code>import</code> <code>javax.transaction.SystemException;</code>

<code>import</code> <code>org.junit.BeforeClass;</code>

<code>import</code> <code>org.junit.Test;</code>

<code>import</code> <code>org.springframework.context.ApplicationContext;</code>

<code>import</code> <code>org.springframework.context.support.ClassPathXmlApplicationContext;</code>

<code>import</code> <code>com.atomikos.icatch.jta.UserTransactionManager;</code>

<code>import</code> <code>com.atomikos.jdbc.AtomikosDataSourceBean;</code>

<code>public</code> <code>class</code> <code>TestAtomikos1 {</code>

<code>    </code><code>public</code> <code>static</code> <code>ApplicationContext ctx;</code>

<code>    </code><code>@BeforeClass</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>beforeClass() {</code>

<code>        </code><code>ctx = </code><code>new</code> <code>ClassPathXmlApplicationContext(</code>

<code>                </code><code>"classpath:integration/spring-jta.xml"</code><code>);</code>

<code>    </code><code>}</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>afterClass() {</code>

<code>        </code><code>ctx = </code><code>null</code><code>;</code>

<code>    </code><code>@Test</code>

<code>    </code><code>public</code> <code>void</code> <code>test1() {</code>

<code>        </code><code>exe(</code><code>"abc"</code><code>, </code><code>"abc"</code><code>);</code>

<code>    </code><code>public</code> <code>void</code> <code>test2() {</code>

<code>        </code><code>exe(</code><code>"123="</code><code>, </code><code>"123"</code><code>);</code>

<code>    </code><code>public</code> <code>void</code> <code>exe(String av, String bv) {</code>

<code>        </code><code>AtomikosDataSourceBean adsA = (AtomikosDataSourceBean) ctx.getBean(</code><code>"a"</code><code>);</code>

<code>        </code><code>AtomikosDataSourceBean adsB = (AtomikosDataSourceBean) ctx.getBean(</code><code>"b"</code><code>);</code>

<code>        </code><code>Connection connA;</code>

<code>        </code><code>Connection connB;</code>

<code>        </code><code>UserTransactionManager utm = (UserTransactionManager) ctx</code>

<code>                </code><code>.getBean(</code><code>"atomikosUserTransactionManager"</code><code>);</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>utm.begin();</code>

<code>            </code><code>connA = adsA.getConnection();</code>

<code>            </code><code>connB = adsB.getConnection();</code>

<code>            </code><code>connA.prepareStatement(</code>

<code>                    </code><code>"insert into jta_temp (value) values('"</code> <code>+ av + </code><code>"')"</code><code>)</code>

<code>                    </code><code>.execute();</code>

<code>            </code><code>connB.prepareStatement(</code>

<code>                    </code><code>"insert into jta_temp (value) values('"</code> <code>+ bv + </code><code>"')"</code><code>)</code>

<code>            </code><code>utm.commit();</code>

<code>        </code><code>} </code><code>catch</code> <code>(SQLException | NotSupportedException | SystemException</code>

<code>                </code><code>| SecurityException | IllegalStateException | RollbackException</code>

<code>                </code><code>| HeuristicMixedException | HeuristicRollbackException e) {</code>

<code>            </code><code>e.printStackTrace();</code>

<code>            </code><code>try</code> <code>{</code>

<code>                </code><code>utm.rollback();</code>

<code>            </code><code>} </code><code>catch</code> <code>(IllegalStateException | SecurityException</code>

<code>                    </code><code>| SystemException e1) {</code>

<code>                </code><code>e1.printStackTrace();</code>

<code>            </code><code>}</code>

<code>        </code><code>}</code>

<code>}</code>

3.2使用Spring的JtaUserTransactionManager对象测试(TestAtomikos2.java 修改TestAtomikos1.java中的exe方法即可)

<code>@Test</code>

<code>public</code> <code>void</code> <code>exe(String av, String bv) {</code>

<code>        </code><code>TransactionFactory txm = (TransactionFactory) ctx</code>

<code>                </code><code>.getBean(</code><code>"transactionManager"</code><code>);</code>

<code>        </code><code>JdbcTemplate a = (JdbcTemplate) ctx.getBean(</code><code>"jdbcTemplateA"</code><code>);</code>

<code>        </code><code>JdbcTemplate b = (JdbcTemplate) ctx.getBean(</code><code>"jdbcTemplateB"</code><code>);</code>

<code>        </code><code>Transaction tx =</code><code>null</code><code>;</code>

<code>         </code><code>tx = txm.createTransaction(</code><code>"tx-name-define"</code><code>, </code><code>10000</code><code>);</code>

<code>            </code><code>a.update(</code><code>"insert into jta_temp (value) values('"</code> <code>+ av + </code><code>"')"</code><code>);</code>

<code>            </code><code>b.update(</code><code>"insert into jta_temp (value) values('"</code> <code>+ bv + </code><code>"')"</code><code>);</code>

<code>            </code><code>tx.commit();</code>

<code>        </code><code>} </code><code>catch</code> <code>(NotSupportedException | SystemException | SecurityException</code>

<code>                </code><code>| RollbackException | HeuristicMixedException</code>

<code>                </code><code>| HeuristicRollbackException e) {</code>

<code>            </code><code>if</code><code>(tx!=</code><code>null</code><code>){</code>

<code>                </code><code>try</code> <code>{</code>

<code>                    </code><code>tx.rollback();</code>

<code>                </code><code>} </code><code>catch</code> <code>(IllegalStateException | SystemException e1) {</code>

<code>                    </code><code>e1.printStackTrace();</code>

<code>                </code><code>}</code>

3.3使用atomikosUserTransaction Bean对象进行测试(TestAtomikos3.java 修改TestAtomikos1.java中的exe方法即可)

<code>        </code><code>exe(</code><code>"123"</code><code>, </code><code>"123="</code><code>);</code>

<code>        </code><code>UserTransaction utx = (UserTransaction) ctx</code>

<code>                </code><code>.getBean(</code><code>"atomikosUserTransaction"</code><code>);</code>

<code>            </code><code>utx.begin();</code>

<code>            </code><code>utx.commit();</code>

<code>                </code><code>utx.rollback();</code>

 使用上述三种UserTransaction进行测试,其中test1方法是成功执行,test2方法是执行失败的(因为插入的值长度超过的字段长度限制)。通过分析之后,如果分布式事物控制正确,那么数据库中写入的值对于两张不同的表而言,是没有数字值被写入的。如图所示:

 在测试过程中,经过对比确实达到了分布式事务控制的效果。

 整个操作的架构图如下(摘自Atomikos Blog)

 关于JTA原理文章开始提到的那篇文章,写的很详细和清晰,可以细细阅读和理解。

本文转自 secondriver 51CTO博客,原文链接:http://blog.51cto.com/aiilive/1658102,如需转载请自行联系原作者