天天看點

分布式事務系列(3.2)jotm分布式事務源碼分析1 系列目錄2 了解xapool3 jotm的事務管理器4 jotm和xapool的互動5 結束語

<a href="http://my.oschina.net/pingpangkuangmo/blog/413518">分布式事務系列(開篇)提出疑問和研究過程</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/415162">分布式事務系列(1.1)spring事務管理器platformtransactionmanager源碼分析</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/416038">分布式事務系列(1.2)spring事務體系</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/417479">分布式事務系列(2.1)分布式事務模型與接口定義</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/419374">分布式事務系列(3.1)jotm的分布式案例</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/420831">分布式事務系列(3.2)jotm分布式事務源碼分析</a>

<a href="http://my.oschina.net/pingpangkuangmo/blog/423210">分布式事務系列(4.1)atomikos的分布式案例</a>

我們在前一篇文章中了解到jotm配合xapool共同完成了分布式事務。jotm主要提供了事務管理器transactionmanager的功能。而xapool則通過使用非xa資料庫驅動實作了xa資料庫驅動的效果。深入了解xapool之前,我們需要認識下xa資料庫驅動到底是什麼

先來看下javax.sql.datasource接口内容:

datasource就是提供connection的。再來看下connectionpooldatasource接口内容

connectionpooldatasource就是提供pooledconnection,它代表了一個與資料庫的實體連接配接,接口如下:

這個實體連接配接能夠産生connection。connection作為pooledconnection的一個handle,即pooledconnection可以産生多個connection來供使用。總結如下:

connectionpooldatasource-》pooledconnection-》connection

datasource-》connection

總之需要帶着懷疑的眼光去看待問題,以上看法不一定對。如果想深入探究,就需要各位去仔細去斟酌。

再來看下javax.sql.xadatasource的接口内容:

xadatasource就是提供xaconnection的,什麼是xaconnection呢?

有了這個通信代表,我們就可以與資料庫進行互動,實作兩階段送出協定。

即實作了xadatasource接口的資料庫驅動,它能夠為我們建立xaconnection,有了xaconnection我們既能擷取普通常見的connection,又能擷取xaresource,實作與資料庫的互動,進而可以實作兩階段送出協定。

是以xa資料庫驅動的最重要核心就是:有了xaresource的實作,能與資料庫進行雙向互動

目前大部分資料庫都是支援xa驅動的,如

對mysql來說,我們所用的mysql-connector-java jar包,就是支援的,它所提供的xadatasource實作是: com.mysql.jdbc.jdbc2.optional.mysqlxadatasource

對oracle來說,它所提供的xadatasource實作是: oracle.jdbc.xa.client.oraclexadatasource

xa資料庫驅動按道理上講,應該是資料庫來負責提供的。xapool就是在資料庫驅動不是xa驅動的前提下,模拟了xa驅動。這種模拟情況下,與資料庫的互動仍然是單向的。

generic pool:一個pool的功能,用來存儲object

standarddatasource和standardxadatasource,分别用于産生connection和xaconnection

standardpooldatasource和standardxapooldatasource 對上述那一對datasource都加上了generic pool的功能,使用pool來維護和管理connection(實際上是pooledconnection,後文會說到)和xaconnection

它實作了一個pool的功能,和其他的pool的大體功能都差不多,它可以存儲和管理任何對象。隻要你實作了相應的接口,這裡即poolhelper接口,看下genericpool的構造函數:

genericpool需要一個poolhelper和一些參數設定(pool中object個數的最大值、最小值等)。來看下poolhelper:

可以看到genericpool通過poolhelper對外暴漏出一些方法,通過poolhelper的create方法将建立的object存放至pool中,通過poolhelper的expire方法将pool中的object真正意義上的删除掉(而不僅僅是從pool中移除)。

standarddatasource需要實作datasource的功能,即能夠擷取connection。這個實作還是比較簡單的,就是利用jdbc原始方式drivermanager來擷取,如下:

standardxadatasource需要實作xadatasource的功能,即能夠擷取xaconnection,同時通過xaconnection能擷取xaresource。

這裡的xaresource留到和jotm的事務管理器一起來說。

standardpooldatasource:内部擁有一個genericpool,它自己實作了poolhelper,我們來看看它把什麼對象交給pool來管理了,即看看它怎麼實作poolhelper的create方法的:

分布式事務系列(3.2)jotm分布式事務源碼分析1 系列目錄2 了解xapool3 jotm的事務管理器4 jotm和xapool的互動5 結束語

它内部擁有一個connectionpooldatasource對象,即圖檔中cpds對象,它能夠産生pooledconnection(上文已說過,可以回去檢視)。

可以看到向pool中存放的對象,并不是connection,而是pooledconnection,上文說過,它是與資料庫的一個實體連接配接,它能夠産生connection。

總結一下就是:standardpooldatasource利用内部connectionpooldatasource對象來建立pooledconnection,然後把建立的pooledconnection交給内部的genericpool來維護和管理。

standardxapooldatasource:内部擁有一個genericpool,它自己實作了poolhelper,我們來看看它把什麼對象交給pool來管理了,即看看它怎麼實作poolhelper的create方法的:

分布式事務系列(3.2)jotm分布式事務源碼分析1 系列目錄2 了解xapool3 jotm的事務管理器4 jotm和xapool的互動5 結束語

它内部擁有一個xadatasource對象,即圖檔中的xads,它能夠産生xaconnection(上文已說過,可以回去檢視)。

可以看到向pool中存放的對象是xaconnection。

總結一下就是:standardxapooldatasource利用内部的xadatasource對象來建立xaconnection,然後把建立的xaconnection交給内部的genericpool來維護和管理。

jotm需要實作的jta定義的接口有:

javax.transaction.usertransaction:面向開發者的使用接口(jotm的實作是org.objectweb.jotm.current)

javax.transaction.transactionmanager:事務管理器接口(jotm的實作是org.objectweb.jotm.current)

javax.transaction.transaction:事務接口(jotm的實作是org.objectweb.jotm.transactionimpl)

javax.transaction.xa.xid:全局事務的唯一标示(jotm的實作是org.objectweb.jotm.xidimpl)

即org.objectweb.jotm.current實作begin方法的源碼内容:

第一步:建立一個全局的唯一标示xid

xidimpl otid = new xidimpl();

第二步:根據唯一标示和逾時設定建立一個事務

transactionimpl tx = new transactionimpl(otid, transactiontimeout)

第三步:對上述事務設定一個定時器

tx.settimer(timermgr.addtimer(tx, transactiontimeout, null, false));

第四步:把上述事務和xid關系綁定到目前線程

threadtx.set(tx); txxids.put(xid, tx);

org.objectweb.jotm.current有這三個線程綁定資料:

private static transient threadlocal&lt;transactionimpl&gt; threadtx = new threadlocal&lt;transactionimpl&gt;(); private static transient threadlocal&lt;integer&gt; threadtimeout = new threadlocal&lt;integer&gt;(); private transient static map&lt;xid, transactionimpl&gt; txxids = new hashmap&lt;xid, transactionimpl&gt;();

在我們建立完成事務後,開始使用業務邏輯操作時即上一篇的工程項目中使用jdbctemplate時,就會執行transactionimpl與xaresource的互動,即javax.transaction.transaction接口方法enlistresource(xaresource xares):即把與資料總管的通信代表xaresource加入到目前事務中來,在看transactionimpl是如何來實作的之前,先了解下transactionimpl中subcoordinator的内部結構

transactionimpl内部有一個subcoordinator,這就是一個兩階段送出中的協調者:

subcoordinator内部含有2個集合:

一個是存放xaresource的,另一個是存放對應xaresource的xid。這樣的做法有點難以接受,他們僅僅靠位置來對應xaresource和xid的關系。這裡說明下,每個事務都會有一個xid進行标示,每個xaresource都會有另一個xid來标示。

subcoordinator實作了一個接口org.objectweb.jotm.resource,接口定義如下:

可以清楚的看到這個org.objectweb.jotm.resource接口就是針對兩階段送出來定義的。先有prepare方法進行投票,根據傳回值結果(上述三個結果vote_commit、vote_rollback、vote_readonly)來選擇下一步是執行commit還是rollback還是forget(當傳回結果為vote_readonly隻讀時,執行forget,即忽略這個事務)。

來看下subcoordinator是如何來實作這個接口的:

接口方法int prepare():

第一步:先将所有的xaresource結束事務邊界,即調用xaresource的end方法,然後從事務transactionimpl的enlistedxares清單中移至delistedxares

事務transactionimpl也有如下三個資料:

private list&lt;xaresource&gt; enlistedxares = collections.synchronizedlist(new arraylist&lt;xaresource&gt;());

private list&lt;xaresource&gt; delistedxares = null;

private list&lt;javax.transaction.xa.xid&gt; enlistedjavaxxid = collections.synchronizedlist(new arraylist&lt;javax.transaction.xa.xid&gt;());

也就是說事務中儲存了一份xaresource,事務中的subcoordinator也儲存了一份

整個過程代碼如下:

//先将enlistedxares全部移至delistedxares delistedxares = new arraylist&lt;xaresource&gt;(enlistedxares); for (xaresource xar : delistedxares) {

}

public boolean delistresource(xaresource xares, int flag){

第二步:開始準備預送出,周遊subcoordinator中所有的xaresource,依次進行預送出即執行每個xaresource的prepare方法(簡化了部分内容)

ret=vote_readonly; int errors = 0; for (int i = 0; i &lt; resourcelist.size(); i++) {

} return ret;

這裡還是同步執行,效率肯定很慢。

接口方法void rollback():

第一步:同上第一步,先将所有的xaresource結束事務邊界,然後從事務transactionimpl的enlistedxares清單中移至delistedxares

第二步:周遊subcoordinator中所有的xaresource,依次執行每個xaresource的rollback方法。

接口方法void commit():也是類似,将xaresource依次執行commit操作。

即把與資料總管的通信代表xaresource加入到目前事務中來,enlistresource(xaresource xares)含有如下操作:

第一步:添加到transactionimpl中的subcoordinator中

subcoord.addresource(xares);

第二步:建立該xaresource對應的xid

xid resxid = new xidimpl( getxid(),subcoord.getxaresindex(xares) ); javax.transaction.xa.xid javaxxid = new javaxidimpl(resxid);

第三步:把該xid加入進subcoordinator中

subcoord.addjavaxxid(javaxxid);

第四步:開始xaresource的事務邊界

xares.start (javaxxid, flag);

第五步:把上述xaresource和xid也存放到事務transactionimpl中

enlistedxares.add(xares); enlistedjavaxxid.add(javaxxid);

transactionimpl的delistresource操作就不再說明了。詳情自己去看源碼。

這個就不再詳細說明了。

第一步:首先從目前線程中擷取綁定的事務即transactionimpl,依托該事務進行復原或者送出,transactionimpl會依托内部的協調者subcoordinator來進行送出或者復原

第二步:清空目前線程綁定的事務

在執行業務邏輯的時候即如下:

再對照着配置檔案來看:

這時候jdbctemplate會從datasource中擷取一個connection來執行sql,即從standardxapooldatasource中擷取connection。具體步驟如下:

第一步:standardxapooldatasource發現它内部的pool還沒有初始化,啟動初始化。初始化會建立指定數量的的對象,standardxapooldatasource在pool中存放和維護的對象是standardxaconnection。而standardxaconnection的建立委托給了由内部的standardxadatasource來進行建立。初始化過後,pool中就存在了指定數量的standardxaconnection對象

第二步:擷取connection,會先從pool中擷取一個standardxaconnection,該對象再進行包裝,封裝成一個standardxaconnectionhandle,傳回給使用者,是以使用者使用的connection是standardxaconnectionhandle類型的

第三步:使用standardxaconnectionhandle來執行sql操作前,會進行事務的判斷,如下:

分布式事務系列(3.2)jotm分布式事務源碼分析1 系列目錄2 了解xapool3 jotm的事務管理器4 jotm和xapool的互動5 結束語

先擷取和目前線程綁定的事務,它是通過上述配置檔案中配置的transactionmanager來擷取的,如下:

transaction ntx = transactionmanager.gettransaction(); 如果存在事務,則設定目前自動送出為false。因為standardxaconnection底層還是使用的是普通的connection來完成的,是以此操作就是設定一個普通的connection的自動送出為false。

将xaresource加入到目前事務中。這部分内容上文已經說了。

這個xaresource是什麼類型呢?就是standardxaconnection,它實作了xaresource:

也就是說事務transactionimpl的送出和復原和prepare操作依賴于transactionimpl内部的subcoordinator,而subcoordinator又會依次調用每個xaresource(這裡即standardxaconnection)的送出和復原和prepare操作。而standardxaconnection内部隻有一個普通的connection,是以standardxaconnection要利用普通的connection來模拟xaresource的操作。具體怎麼模拟的,這裡就不再介紹了,詳情去看源代碼。

針對上一篇文章介紹的工程項目,簡單總結下:

第一步:有了@transactional注解,會把建立出代理對象,加入事務攔截器,在執行save方法之前,會先使用usertransaction開啟一個事務即transactionimpl,并建立一個xid,進行标示,然後把該事務綁定到目前線程。

第二步:使用userdao執行業務邏輯時,即使用jdbctemplate操作時,會從standardxapooldatasource中擷取一個connection。

首選會初始化standardxapooldatasource中的pool,建立出指定數量的standardxaconnection

然後再從pool中擷取一個standardxaconnection,standardxaconnection又進行再次包裝成standardxaconnectionhandle,傳回給使用者作為connection。

第三步:使用connection(這裡即standardxaconnectionhandle)執行sql前,會根據事務管理器擷取目前線程綁定的事務,如果有,則設定standardxaconnectionhandle的自動送出為false,最終是設定到普通的connection上了。并且把xaresource加入到目前事務中,即把standardxaconnection加入到transactionimpl中,同時開啟xaresource的事務邊界,即調用start方法

第四步:一旦執行過程發生異常,spring的platformtransactionmanager這裡即jtatransactionmanager,會擷取usertransaction(這裡即org.objectweb.jotm.current)進行復原操作,它會擷取目前此線程綁定的事務transactionimpl進行復原,transactionimpl會委托到内部的協調者subcoordinator,subcoordinator會調用每個加入進來的xaresource(這裡即standardxaconnection)執行復原操作,standardxaconnection則會依托内部的普通connection進行復原操作。

可以看到上述過程,事務transactionimpl内部的協調者subcoordinator雖然實作了兩階段送出過程的代碼,但是在上述案例中并沒有展現出來,也就是沒有去調用過prepare過程。

上述過程subcoordinator僅僅起到了一個收集connection的作用,首選把所有的connection的自動送出設定為false,執行業務操作,一旦發現異常,則執行每個connection的rollback操作。

是以到底怎麼去使用兩階段送出模式呢?還需要去仔細去研究研究這兩方面的内容:

javax.resource.spi.xaterminator接口和org.objectweb.jotm.xaterminatorimpl實作,該接口也是javax針對兩階段送出協定定義的接口,和jotm中定義的org.objectweb.jotm.resource差不多。實作其實還是基于subcoordinator來執行的,這在什麼情況下使用呢?

subcoordinator的源代碼,它還繼承了一個遠端調用的portableremoteobject。有興趣的可以去研究下

/**

public class subcoordinator extends portableremoteobject implements resource

本篇文章主要介紹了使用jotm和xapool實作分布式事務的原理。下一篇就開始介紹atomikos對于分布式事務的支援。同樣先給出例子:

atomikos使用非xa資料庫驅動實作分布式事務的例子

atomikos使用xa資料庫驅動實作分布式事務的例子