天天看點

分布式事務JTA實作Atomikos與Spring內建實踐

  Atomikos官網無法通路,不過Maven中央庫中具atomikos包。Atomikos內建Spring,Hibernate,Mybatis網上文章比較多,本文是通過JavaSE的方式借用Spring配置來測試Atomikos對JTA的實作。

 下面做一件事,就是兩(+)個資料庫,在一個事務裡對其分别對資料庫操作驗證操作的原子性,即要麼兩個資料庫的操作都成功,要麼都失敗。

 1.準備工作

  1.1 Maven pom.xml中添加依賴包

  atomikos:(目前最新版)

1

2

3

4

5

<code>&lt;</code><code>dependency</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>groupId</code><code>&gt;com.atomikos&lt;/</code><code>groupId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>artifactId</code><code>&gt;transactions-jdbc&lt;/</code><code>artifactId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>version</code><code>&gt;3.9.3&lt;/</code><code>version</code><code>&gt;</code>

<code>        </code><code>&lt;/</code><code>dependency</code><code>&gt;</code>

 jar依賴圖:

 Postgresql資料庫驅動:

<code>            </code><code>&lt;</code><code>groupId</code><code>&gt;org.postgresql&lt;/</code><code>groupId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>artifactId</code><code>&gt;postgresql&lt;/</code><code>artifactId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>version</code><code>&gt;9.2-1004-jdbc4&lt;/</code><code>version</code><code>&gt;</code>

 注:文中使用的是Postgresql V9.2

 javax.transaction.transaction-api.1.1:

6

7

8

<code>&lt;!-- Include in javaee-api --&gt;</code>

<code>        </code><code>&lt;!-- </code>

<code>        </code><code>&lt;dependency&gt;</code>

<code>            </code><code>&lt;groupId&gt;javax.transaction&lt;/groupId&gt;</code>

<code>            </code><code>&lt;artifactId&gt;transaction-api&lt;/artifactId&gt;</code>

<code>            </code><code>&lt;version&gt;1.1&lt;/version&gt;</code>

<code>        </code><code>&lt;/dependency&gt;</code>

<code>         </code><code>--&gt;</code>

<code>&lt;!-- JavaEE --&gt;</code>

<code>        </code><code>&lt;!-- javaee-api包含了JavaEE規範中的api,如servlet-api,persistence-api, transaction-api等 --&gt;</code>

<code>        </code><code>&lt;</code><code>dependency</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>groupId</code><code>&gt;javax&lt;/</code><code>groupId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>artifactId</code><code>&gt;javaee-api&lt;/</code><code>artifactId</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>version</code><code>&gt;7.0&lt;/</code><code>version</code><code>&gt;</code>

 注:根據需要選擇其一即可。

 Spring,Junit依賴這裡省略。

 1.2 建立資料庫以及表

   資料庫分别是:javaee,tomdb

 2.在項目中添加配置檔案

  spring-jta.xml 和transaction.properties(模版見文中附件)檔案,spring-jta.xml在src/main/resources/integration下,transaction.properties在src/main/resources/下。

 2.1在spring-jta.xml中配置兩個XADataSource:

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

<code>&lt;!-- 使用分布式事務時設定Postgresql的max_prepared_transactions為大于0的值,該值預設是0 --&gt;</code>

<code>    </code><code>&lt;!-- 資料庫A --&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"a"</code> <code>class</code><code>=</code><code>"com.atomikos.jdbc.AtomikosDataSourceBean"</code>

<code>        </code><code>init-method</code><code>=</code><code>"init"</code> <code>destroy-method</code><code>=</code><code>"close"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"uniqueResourceName"</code> <code>value</code><code>=</code><code>"pg/a"</code> <code>/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"xaDataSourceClassName"</code> <code>value</code><code>=</code><code>"org.postgresql.xa.PGXADataSource"</code> <code>/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"xaProperties"</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>props</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"user"</code><code>&gt;postgres&lt;/</code><code>prop</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"password"</code><code>&gt;postgres&lt;/</code><code>prop</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"serverName"</code><code>&gt;localhost&lt;/</code><code>prop</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"portNumber"</code><code>&gt;5432&lt;/</code><code>prop</code><code>&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"databaseName"</code><code>&gt;tomdb&lt;/</code><code>prop</code><code>&gt;</code>

<code>            </code><code>&lt;/</code><code>props</code><code>&gt;</code>

<code>        </code><code>&lt;/</code><code>property</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"poolSize"</code> <code>value</code><code>=</code><code>"10"</code> <code>/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"reapTimeout"</code> <code>value</code><code>=</code><code>"20000"</code> <code>/&gt;</code>

<code>    </code><code>&lt;/</code><code>bean</code><code>&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"b"</code> <code>class</code><code>=</code><code>"com.atomikos.jdbc.AtomikosDataSourceBean"</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"uniqueResourceName"</code> <code>value</code><code>=</code><code>"pg/b"</code> <code>/&gt;</code>

<code>                </code><code>&lt;</code><code>prop</code> <code>key</code><code>=</code><code>"databaseName"</code><code>&gt;javaee&lt;/</code><code>prop</code><code>&gt;</code>

 說明:

 Postgresql的max_prepared_transactions參數值預設是0,要開啟XA需要設定該值至少和max_connections參數值一樣大,該參數在PostgreSQL\9.3\data\postgresql.conf檔案中。

 PGXADataSource的父類BaseDataSource沒有url屬性,可需要分别設定serverName,portNumber,databaseName等屬性。不同的資料庫驅動有不同的實作方法。

 2.2 配置事務管理對象和UserTransaction接口實作

<code>&lt;!-- atomikos事務管理 --&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"atomikosUserTransactionManager"</code> <code>class</code><code>=</code><code>"com.atomikos.icatch.jta.UserTransactionManager"</code>  <code>init-method</code><code>=</code><code>"init"</code> <code>destroy-method</code><code>=</code><code>"close"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>description</code><code>&gt;UserTransactionManager&lt;/</code><code>description</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"forceShutdown"</code> <code>value</code><code>=</code><code>"true"</code> <code>/&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"atomikosUserTransaction"</code> <code>class</code><code>=</code><code>"com.atomikos.icatch.jta.UserTransactionImp"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"transactionTimeout"</code> <code>value</code><code>=</code><code>"300"</code> <code>/&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"transactionManager"</code>

<code>        </code><code>class</code><code>=</code><code>"org.springframework.transaction.jta.JtaTransactionManager"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"transactionManager"</code> <code>ref</code><code>=</code><code>"atomikosUserTransactionManager"</code><code>&gt;&lt;/</code><code>property</code><code>&gt;</code>

 上面三個Bean可以獨立使用來進行事務控制,具體看下面3。

3. 編寫測試

 3.1 使用atomikosUserTransactionManager對象測試(TestAtomikos1.java)

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

<code>package</code> <code>secondriver.springsubway.example.jta;</code>

<code>import</code> <code>java.sql.Connection;</code>

<code>import</code> <code>java.sql.SQLException;</code>

<code>import</code> <code>javax.transaction.HeuristicMixedException;</code>

<code>import</code> <code>javax.transaction.HeuristicRollbackException;</code>

<code>import</code> <code>javax.transaction.NotSupportedException;</code>

<code>import</code> <code>javax.transaction.RollbackException;</code>

<code>import</code> <code>javax.transaction.SystemException;</code>

<code>import</code> <code>org.junit.BeforeClass;</code>

<code>import</code> <code>org.junit.Test;</code>

<code>import</code> <code>org.springframework.context.ApplicationContext;</code>

<code>import</code> <code>org.springframework.context.support.ClassPathXmlApplicationContext;</code>

<code>import</code> <code>com.atomikos.icatch.jta.UserTransactionManager;</code>

<code>import</code> <code>com.atomikos.jdbc.AtomikosDataSourceBean;</code>

<code>public</code> <code>class</code> <code>TestAtomikos1 {</code>

<code>    </code><code>public</code> <code>static</code> <code>ApplicationContext ctx;</code>

<code>    </code><code>@BeforeClass</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>beforeClass() {</code>

<code>        </code><code>ctx = </code><code>new</code> <code>ClassPathXmlApplicationContext(</code>

<code>                </code><code>"classpath:integration/spring-jta.xml"</code><code>);</code>

<code>    </code><code>}</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>afterClass() {</code>

<code>        </code><code>ctx = </code><code>null</code><code>;</code>

<code>    </code><code>@Test</code>

<code>    </code><code>public</code> <code>void</code> <code>test1() {</code>

<code>        </code><code>exe(</code><code>"abc"</code><code>, </code><code>"abc"</code><code>);</code>

<code>    </code><code>public</code> <code>void</code> <code>test2() {</code>

<code>        </code><code>exe(</code><code>"123="</code><code>, </code><code>"123"</code><code>);</code>

<code>    </code><code>public</code> <code>void</code> <code>exe(String av, String bv) {</code>

<code>        </code><code>AtomikosDataSourceBean adsA = (AtomikosDataSourceBean) ctx.getBean(</code><code>"a"</code><code>);</code>

<code>        </code><code>AtomikosDataSourceBean adsB = (AtomikosDataSourceBean) ctx.getBean(</code><code>"b"</code><code>);</code>

<code>        </code><code>Connection connA;</code>

<code>        </code><code>Connection connB;</code>

<code>        </code><code>UserTransactionManager utm = (UserTransactionManager) ctx</code>

<code>                </code><code>.getBean(</code><code>"atomikosUserTransactionManager"</code><code>);</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>utm.begin();</code>

<code>            </code><code>connA = adsA.getConnection();</code>

<code>            </code><code>connB = adsB.getConnection();</code>

<code>            </code><code>connA.prepareStatement(</code>

<code>                    </code><code>"insert into jta_temp (value) values('"</code> <code>+ av + </code><code>"')"</code><code>)</code>

<code>                    </code><code>.execute();</code>

<code>            </code><code>connB.prepareStatement(</code>

<code>                    </code><code>"insert into jta_temp (value) values('"</code> <code>+ bv + </code><code>"')"</code><code>)</code>

<code>            </code><code>utm.commit();</code>

<code>        </code><code>} </code><code>catch</code> <code>(SQLException | NotSupportedException | SystemException</code>

<code>                </code><code>| SecurityException | IllegalStateException | RollbackException</code>

<code>                </code><code>| HeuristicMixedException | HeuristicRollbackException e) {</code>

<code>            </code><code>e.printStackTrace();</code>

<code>            </code><code>try</code> <code>{</code>

<code>                </code><code>utm.rollback();</code>

<code>            </code><code>} </code><code>catch</code> <code>(IllegalStateException | SecurityException</code>

<code>                    </code><code>| SystemException e1) {</code>

<code>                </code><code>e1.printStackTrace();</code>

<code>            </code><code>}</code>

<code>        </code><code>}</code>

<code>}</code>

3.2使用Spring的JtaUserTransactionManager對象測試(TestAtomikos2.java 修改TestAtomikos1.java中的exe方法即可)

<code>@Test</code>

<code>public</code> <code>void</code> <code>exe(String av, String bv) {</code>

<code>        </code><code>TransactionFactory txm = (TransactionFactory) ctx</code>

<code>                </code><code>.getBean(</code><code>"transactionManager"</code><code>);</code>

<code>        </code><code>JdbcTemplate a = (JdbcTemplate) ctx.getBean(</code><code>"jdbcTemplateA"</code><code>);</code>

<code>        </code><code>JdbcTemplate b = (JdbcTemplate) ctx.getBean(</code><code>"jdbcTemplateB"</code><code>);</code>

<code>        </code><code>Transaction tx =</code><code>null</code><code>;</code>

<code>         </code><code>tx = txm.createTransaction(</code><code>"tx-name-define"</code><code>, </code><code>10000</code><code>);</code>

<code>            </code><code>a.update(</code><code>"insert into jta_temp (value) values('"</code> <code>+ av + </code><code>"')"</code><code>);</code>

<code>            </code><code>b.update(</code><code>"insert into jta_temp (value) values('"</code> <code>+ bv + </code><code>"')"</code><code>);</code>

<code>            </code><code>tx.commit();</code>

<code>        </code><code>} </code><code>catch</code> <code>(NotSupportedException | SystemException | SecurityException</code>

<code>                </code><code>| RollbackException | HeuristicMixedException</code>

<code>                </code><code>| HeuristicRollbackException e) {</code>

<code>            </code><code>if</code><code>(tx!=</code><code>null</code><code>){</code>

<code>                </code><code>try</code> <code>{</code>

<code>                    </code><code>tx.rollback();</code>

<code>                </code><code>} </code><code>catch</code> <code>(IllegalStateException | SystemException e1) {</code>

<code>                    </code><code>e1.printStackTrace();</code>

<code>                </code><code>}</code>

3.3使用atomikosUserTransaction Bean對象進行測試(TestAtomikos3.java 修改TestAtomikos1.java中的exe方法即可)

<code>        </code><code>exe(</code><code>"123"</code><code>, </code><code>"123="</code><code>);</code>

<code>        </code><code>UserTransaction utx = (UserTransaction) ctx</code>

<code>                </code><code>.getBean(</code><code>"atomikosUserTransaction"</code><code>);</code>

<code>            </code><code>utx.begin();</code>

<code>            </code><code>utx.commit();</code>

<code>                </code><code>utx.rollback();</code>

 使用上述三種UserTransaction進行測試,其中test1方法是成功執行,test2方法是執行失敗的(因為插入的值長度超過的字段長度限制)。通過分析之後,如果分布式事物控制正确,那麼資料庫中寫入的值對于兩張不同的表而言,是沒有數字值被寫入的。如圖所示:

 在測試過程中,經過對比确實達到了分布式事務控制的效果。

 整個操作的架構圖如下(摘自Atomikos Blog)

 關于JTA原理文章開始提到的那篇文章,寫的很詳細和清晰,可以細細閱讀和了解。

本文轉自 secondriver 51CTO部落格,原文連結:http://blog.51cto.com/aiilive/1658102,如需轉載請自行聯系原作者