通過《消費者實作應用内分布式事務》、《生産者實作應用内分布式事務管理》、《實作持久訂閱消費者》三個章節的實踐,其實我們已經可以通過消息隊列實作多應用的分布式事務,應用内的事務保證了消息不會被重複生産消費、持久化訂閱保證了消息一定會被消費(進入死信隊列特殊處理),但其對于業務來說耦合性還是太強,在進行業務處理的同時消息處理名,其采用的仍然是應用内的事務處理,并不适合在大型高性能高并發系統實踐,那麼本章将通過本地事務+消息隊列+外部事件定義表+定時任務實作解耦。 (目前主要實作微服務下最終一緻性事務的方案主要是:可靠事件;補償模式;TCC(Try-Confirm-Cancel);三種,本案例為可靠事件方式)
場景描述: 本場景就拿最常用的轉賬業務闡述: 在工行ICBC有賬号Card001,其中存于500元; 在中行BOC有賬号Card002,其中也存有500元; 此時從Card001賬号轉賬至Card002賬号300元。
系統設計: 工行系統ICBCPro,該工程主要實作兩個功能(實作轉出金額生成轉賬事件;定時任務發出轉賬事件至消息隊列),主要參考《生産者實作應用内分布式事務管理》實作; 中行系統BOCPro,該工程主要實作兩個功能(從消息隊列下載下傳轉賬事件;定時任務對轉賬事件處理并更新轉入賬号金額),主要參考《消費者實作應用内分布式事務》實作; 此場景僅需要通過P2P消息模式即可。
建構ICBCPro工程 A、實作轉出金額生成轉賬事件 1、建構資料庫相關表以及基礎資料: 轉出賬号資料
轉出事件記錄
消息隊列
消息控制台
2、執行單元測試代碼實作轉賬,此時賬戶扣除與轉賬事件記錄均在本地事務内:
//ICBC中賬戶card001轉出300元
@Test
public void tranfer(){
EventLog eventLog = new EventLog();
eventLog.setAmount(new BigDecimal(300));
eventLog.setFromcard("card001");
eventLog.setTocard("card002");
eventLog.setEventstate(EventState.NEW);
eventLog.setTransferDate(new Date());
eventLogService.transfer(eventLog,new BigDecimal(300));
}
賬戶資訊:
事件記錄
B、定時任務發出轉賬事件至消息隊列 對于事件記錄表,我們可以定義一個定時任務,将所有的NEW狀态事件全部發出,此時需要保證消息的可靠性,采用XA事務實作,但已經不影響我們業務的響應了,實作解耦、快速響應,下面貼出核心實作代碼: 1、首選實作資料排它鎖場景下的查詢與更新:
/**
* 在排它鎖場景下資料更新,保證資料的可靠性
*/
@Override
public void updateEventstateById(String id, EventState eventState) {
EventLog eventLog=findEventLog4Update(id);
eventLog.setEventstate(eventState);
emJ1.merge(eventLog);
}
/**
* 實作排它鎖查詢
*/
@Override
public EventLog findEventLog4Update(String id){
EventLog eventLog=emJ1.find(EventLog.class, id, LockModeType.PESSIMISTIC_WRITE);
return eventLog;
}
2、在service定義查詢所有NEW狀态的事件、并采用XA事務管理NEW狀态事件的發送與更新(為了驗證了事務生效,設定了一個fromcard為空的資料觸發異常),在異常情況下我們也需要保證countDownLatch執行,避免線程阻塞:
@Service
public class EventLogService {
@Autowired
private EventLogRepository eventLogRepository;
@Resource(name="jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
@Qualifier("icbc2boc")
private Queue icbc2boc;
....
/**
* 根據eventstate擷取EventLog資料集
* @param eventstate
* @return
*/
@Transactional(transactionManager="transactionManager1",propagation=Propagation.SUPPORTS,readOnly=true)
public List<EventLog> findByEventState(EventState eventstate){
return eventLogRepository.findByEventstate(eventstate);
}
/**
* XA事務
* @param id
* @param eventstate
*/
@Transactional(transactionManager="transactionManagerJTA",propagation=Propagation.REQUIRES_NEW)
public void transferToMQ(EventLog eventLog,EventState eventstate,CountDownLatch countDownLatch){
try {
System.out.println(Thread.currentThread().getName()+"本次處理資料:"+eventLog.getFromcard()+"、"+eventLog.getEventstate());
//再次資料庫查詢判斷,此時用到排它鎖--在兩個定時任務連續執行,一旦出現程式送出事務指令至資料庫,
//但資料庫還未執行,此時我們全表查詢的結果中目前資料行仍為修改前資料,故會造成重複消費
eventLog=eventLogRepository.findEventLog4Update(eventLog.getId());
if(EventState.Publish.equals(eventLog.getEventstate())){
System.out.println(Thread.currentThread().getName()+"資料:"+eventLog.getFromcard()+"無需處理");
return;
}
//payload
jmsQueueMessagingTemplate.convertAndSend(icbc2boc,eventLog);
eventLogRepository.updateEventstateById(eventLog.getId(), eventstate);
//構造異常場景驗證XA事務
if(eventLog.getFromcard()==null){
System.out.println(Thread.currentThread().getName()+"資料異常,不處理");
System.out.println(1/0);
}else{
System.out.println(Thread.currentThread().getName()+":"+eventLog.getFromcard()+"資料處理成功");
}
} finally {
countDownLatch.countDown();
}
}
}
3、定義Job,實作轉出任務,并通過線程池異步處理待處理事件集合,通過并發提高處理性能,通過countDownLatch保證了每個任務所有線程處理完成後啟動下一次任務;
/**
* 轉出任務
* @author song
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution //保證每次任務執行完畢,設定為串行執行
public class TransferJob extends QuartzJobBean {
private Logger logger=LoggerFactory.getLogger(TransferJob.class);
@Autowired
@Qualifier("quartzThreadPool")
private ThreadPoolTaskExecutor quartzThreadPool;
@Autowired
private EventLogService eventLogService;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("本次批處理開始");
//擷取所有未發送狀态的Event
List<EventLog> list=eventLogService.findByEventState(EventState.NEW);
//
final CountDownLatch countDownLatch=new CountDownLatch(list.size());
//周遊發送
for(final EventLog eventLog:list){
//通過線程池送出任務執行,大大提高處理集合效率
quartzThreadPool.submit(new Runnable() {
@Override
public void run() {
eventLogService.transferToMQ(eventLog,EventState.Publish,countDownLatch);
}
});
}
//保證所有線程執行完成後退出
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("本次批處理完成");
}
}
4、定義轉出任務、觸發器、排程器以及處理線程池:
@Bean(name="tranferJob")
public JobDetailFactoryBean tranferJob(){
JobDetailFactoryBean factoryBean=new JobDetailFactoryBean();
//定義任務類
factoryBean.setJobClass(TransferJob.class);
//表示任務完成之後是否依然保留到資料庫,預設false
factoryBean.setDurability(true);
//為Ture時當Quartz服務被中止後,再次啟動或叢集中其他機器接手任務時會嘗試恢複執行之前未完成的所有任務,預設false
factoryBean.setRequestsRecovery(true);
return factoryBean;
}
/**
* 注冊job1的觸發器
* @return
*/
@Bean(name="transferJobTrigger")
public CronTriggerFactoryBean transferJobTrigger(){
//觸發器
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setCronExpression("*/5 * * * * ?");
cronTriggerFactoryBean.setJobDetail(tranferJob().getObject());
//排程工廠執行個體化後,經過5秒開始執行排程
cronTriggerFactoryBean.setStartDelay(30000);
cronTriggerFactoryBean.setGroup("tranfer");
cronTriggerFactoryBean.setName("tranfer");
return cronTriggerFactoryBean;
}
/**
* 排程工廠,加載觸發器,并設定自動啟動、啟動時延
* @return
*/
@Bean(name="transferSchedulerFactoryBean")
public SchedulerFactoryBean transferSchedulerFactoryBean(){
//排程工廠
SchedulerFactoryBean schedulerFactoryBean= new SchedulerFactoryBean();
schedulerFactoryBean.setConfigLocation(new ClassPathResource("quartz.properties"));
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey");
//叢集Cluster下設定dataSource
// schedulerFactoryBean.setDataSource(dataSource);
//QuartzScheduler啟動時更新己存在的Job,不用每次修改targetObject後删除qrtz_job_details表對應記錄了
schedulerFactoryBean.setOverwriteExistingJobs(true);
//QuartzScheduler延時啟動20S,應用啟動完後 QuartzScheduler 再啟動
schedulerFactoryBean.setStartupDelay(20);
//自動啟動
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.setTriggers(transferJobTrigger().getObject());
//自定義的JobFactory解決job中service的bean注入
schedulerFactoryBean.setJobFactory(jobFactory);
return schedulerFactoryBean;
}
/**
* 用于處理待轉賬資料發至消息隊列的線程池
* @return
*/
@Bean(name="quartzThreadPool")
public ThreadPoolTaskExecutor getThreadPoolTaskExecutor(){
ThreadPoolTaskExecutor pool=new ThreadPoolTaskExecutor();
pool.setCorePoolSize(10);
pool.setQueueCapacity(100);
pool.setMaxPoolSize(10);
pool.setKeepAliveSeconds(10);
//避免應用關閉,任務沒有執行完成,起到shutdownhook鈎子的作用
pool.setWaitForTasksToCompleteOnShutdown(true);
//空閑時核心線程也不退出
pool.setAllowCoreThreadTimeOut(false);
//設定拒絕政策,不可執行的任務将被抛棄
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return pool;
}
小結 特别注意: 1、周期時間剛好兩個定時任務連續執行,出現java程式送出事務緊接第二個任務啟動,但資料庫未完成指令,此時後續任務已經查詢資料,全表過濾能夠再次擷取未送出資料行原始資料,會造成二次消費,故需要對其采用排它鎖方式,二次查詢判斷後決定是否消費,進而規避二次消費問題; 2、注意在Service中不能随便catch異常,避免分布式事務未復原,造成重複消費; 3、通過CountDownLatch,實作任務線程等待所有的子任務線程執行完畢後方可退出本次任務,執行下一個任務,故其一定要在finally中實作countdown,避免造成任務線程阻塞; 4、需要設定OpenEntityManagerInViewInterceptor攔截器,避免提示session過早關閉問題; 5、資料庫DataSource必須定義好destroyMethod,避免程式關閉,事務還未送出的情況下出現連接配接池已經關閉; 6、設定好連接配接池需要等待已送出任務完成後方可shutdown;
優化空間: 1、根據資料特征進行任務分割,比如自增ID場景下,根據0、1、2等最後一位尾數分割不同的定時任務,配置任務叢集,進而實作分布式高可用叢集處理; 2、在資料查詢處理過程中,優化sql,提高單次查詢性能; 3、添加獨立的定時任務,将Publish已消費資料轉儲,減輕單表壓力; 4、目前已經加入線程池異步處理資料集合,提高單次任務執行效率; 5、一旦資料庫壓力比較大的情況下,也可以将Event分庫操作,減輕伺服器資料庫連接配接、IO壓力; 6、采用微服務模式,将兩個功能實作服務分離; 7、也可以在定時任務中添加比如50MS的sleep時長,保證資料庫伺服器端事務送出成功,取消排它鎖将進一步提高性能較小資料庫死鎖問題;
遺留問題: 1、在開發環境下,手動關閉程式MQ連接配接會過早關閉,修改資料後事務未送出,出現MySQL資料庫行已經被執行排他鎖;
建構BOCPro工程 A、從消息隊列下載下傳轉賬事件 1、建構資料庫BOC資料庫相關表以及基礎資料:
事件表暫時為空
消息隊列,有一條轉賬資料
2、配置隊列消息模闆:
@Configuration
public class JmsMessageConfiguration {
@Autowired
@Qualifier(value="jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
/**
* 定義點對點隊列
* @return
*/
@Bean(name="icbc2boc")
public Queue queue() {
return new ActiveMQQueue("icbc2boc");
}
/**
* 建立處理隊列消息模闆
* @return
*/
@Bean(name="jmsQueueMessagingTemplate")
public JmsMessagingTemplate jmsQueueMessagingTemplate() {
JmsMessagingTemplate jmsMessagingTemplate =new JmsMessagingTemplate(jmsQueueTemplate);
//通過MappingJackson2MessageConverter實作Object轉換
jmsMessagingTemplate.setMessageConverter(new MappingJackson2MessageConverter());
return new JmsMessagingTemplate(jmsQueueTemplate);
}
}
3、配置監聽器,監聽轉賬事件消息:
@Component
public class TraferIn {
@Autowired
@Qualifier("icbc2boc")
private Queue queue;
@Autowired
@Qualifier("jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
private EventLogService eventLogService;
/**
* 定義監聽轉賬事件監聽
* @param text
* @throws Exception
*/
@JmsListener(destination = "icbc2boc",containerFactory="jmsListenerContainerFactory4Queue")//ActiveMQ.DLQ
public void receiveQueue(EventLog eventLog) throws Exception {
System.out.println("接受到的事件資料:"+eventLog.toString());
eventLogService.mq2transfer(eventLog, new BigDecimal(300));
}
}
4、采用分布式事務管理下載下傳的消息隊列事件,模拟事務失效,驗證成功:
/**
* XA事務
* @param eventLog
* @param amount
*/
@Transactional(transactionManager="transactionManagerJTA",propagation=Propagation.REQUIRED)
public void mq2transfer(EventLog eventLog,BigDecimal amount){
//儲存事件日志
eventLogRepository.saveEvetLog(eventLog);
// System.out.println(1/0);
}
5、需要采用XA事務,故我們不能直接通過EventLogRepository儲存資料,定義自定義儲存方法:
/**
* 采用分布式事務資料源儲存事件
*/
@Override
public EventLog saveEvetLog(EventLog eventLog) {
return emJ1.merge(eventLog);
}
6、啟動程式監聽後,收到事件
資料庫添加了一條NEW狀态事件
消費後消息隊列被清空
B、定時任務對轉賬事件處理并更新轉入賬号金額 通過定時任務掃描下載下傳的所有事件,并啟動線程池異步快速處理所有的轉賬事件,可能一批次事件中會出現同個賬号多次記錄的場景,更新操作為非幂等操作,故我們需要采用排它鎖的方式對資料行更新。并且通過本地事務的方式管理事件和賬号表更新,進而大大提高了業務處理速度。通過此方式也實作了業務和事件的解耦。 1、本地為本地事務處理,故我們可以很友善在EventLogRepository通過接口定義即可解決查詢、更新,主要包含查詢所有的NEW狀态事件、查詢單個t_card表實作排它鎖(解決多線程下的幂等性)、更新事件狀态,特别關注在接口中如何實作原生SQL排它鎖查詢的注解定義:
/**
* 實作排它鎖
* @param id
* @return
*/
@Query(value="select t.id from t_card t where t.id=:id for update",nativeQuery=true)
void findCard4UpdateById(@Param("id")String id);
/**
* 更新EventLog狀态
* @param id
* @param eventstate
* @return
*/
@Modifying
@Query(value = "update EventLog e set e.eventstate=:eventstate where e.id = :id ")
int updateEventstateById(@Param("id")String id,@Param("eventstate")EventState eventstate);
/**
* 根據EventState查詢所有EventLog
* @param EventState
* @return
*/
List<EventLog> findByEventstate(EventState eventState);
2、采用ICBCCPro項目中一樣的原生SQL更新語句,主要處理賬号金額調整,其實也可以通過接口定義實作,多個方式玩玩:
/**
* 執行原生語句實作更新
*/
@Override
public int executeUpdateNativeSQL(String strSQL) {
return em1.createNativeQuery(strSQL, Integer.class).executeUpdate();
}
3、定義service中真正處理轉賬事件的邏輯,在其中我們在多線程的場景下,對事件表和Card表采用了不一樣的鎖機制,事件表通過樂觀鎖避免重複消費,保證事件處理幂等性:
/**
* 本地事務
* @param id
* @param eventstate
*/
@Transactional(transactionManager="transactionManager1",propagation=Propagation.REQUIRES_NEW)
public void transfer(EventLog eventLog,EventState eventstate,CountDownLatch countDownLatch){
try {
System.out.println(Thread.currentThread().getName()+"本次處理資料轉入賬号:"+eventLog.getTocard()+"、"+eventLog.getEventstate());
//通過樂觀鎖方式再次判斷,保證事件的可靠消息,僅在極端情況下會出現重複消費,故采用樂觀鎖
int updateCount=eventLogRepository.updateEventstateById(eventLog.getId(),eventstate);
//如果等于則表明已經處理過
if(updateCount==0){
System.out.println(Thread.currentThread().getName()+"資料收款卡号:"+eventLog.getTocard()+"無需處理");
return;
}
//沒有被處理過需要繼續更新賬戶金額
//更新查詢,采用排它鎖方式,避免在多線程任務下,出現多個線程修改同一個卡号,進而事務幂等性
eventLogRepository.findCard4UpdateById(eventLog.getTocard());
//更新賬戶資訊,轉入累加,屬于非幂等操作
eventLogRepository.executeUpdateNativeSQL("update t_card set amount=amount+"+eventLog.getAmount()+" where id='"+eventLog.getTocard()+"'");
// System.out.println(1/0);
System.out.println(Thread.currentThread().getName()+":"+eventLog.getFromcard()+"資料處理成功");
} finally {
countDownLatch.countDown();
}
}
4、定義定時任務,其主要注入了一個線程池協助我們快速異步處理所有事件,通過CountDownLatch 保證了同一次任務所有事件處理完成後方可退出任務線程,然後啟動下一次任務,保證任務串行執行:
@PersistJobDataAfterExecution
@DisallowConcurrentExecution //保證每次任務執行完畢,設定為串行執行
public class TransferJob extends QuartzJobBean {
private Logger logger=LoggerFactory.getLogger(TransferJob.class);
@Autowired
@Qualifier("quartzThreadPool")
private ThreadPoolTaskExecutor quartzThreadPool;
@Autowired
private EventLogService eventLogService;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("本次批處理開始");
//擷取所有未發送狀态的Event
List<EventLog> list=eventLogService.findByEventState(EventState.NEW);
//
final CountDownLatch countDownLatch=new CountDownLatch(list.size());
//周遊發送
for(final EventLog eventLog:list){
//通過線程池送出任務執行,大大提高處理集合效率
quartzThreadPool.submit(new Runnable() {
@Override
public void run() {
eventLogService.transfer(eventLog, EventState.Publish, countDownLatch);
}
});
}
//保證所有線程執行完成後退出
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("本次批處理完成");
}
}
5、啟動定時任務後
資料庫,事件表狀态更新:
金額中card金額累計:
小結 特别注意: 1、對應不同的表和操作,我們需要采用不一樣的鎖機制,首先判斷操作是否具有幂等性; 2、注意在Service中不能随便catch異常,避免分布式事務未復原,造成重複消費; 3、通過CountDownLatch,實作任務線程等待所有的子任務線程執行完畢後方可退出本次任務,執行下一個任務,故其一定要在finally中實作countdown,避免造成任務線程阻塞; 4、需要設定OpenEntityManagerInViewInterceptor攔截器,避免提示session過早關閉問題; 5、資料庫DataSource必須定義好destroyMethod,避免程式關閉,事務還未送出的情況下出現連接配接池已經關閉; 6、設定好連接配接池需要等待已送出任務完成後方可shutdown;
優化空間: 1、采用微服務模式,将兩個功能實作服務分離; 2、A功能:根據隊列消息的特性,在有多個消費者的情況下,其也僅僅會被消費一次,故我們可以建構多個消費者伺服器,進而實作異步下載下傳壓力水準分攤; 3、設定合理的資料庫連接配接池大小,進而實作限流作用,避免資料庫伺服器壓力過大; 4、B功能:根據資料特征進行任務分割,比如自增ID場景下,根據0、1、2等最後一位尾數分割不同的定時任務,配置任務叢集,進而實作分布式高可用叢集處理; 5、在資料查詢處理過程中,優化sql,提高單次查詢性能; 6、添加獨立的定時任務,将Publish已消費資料轉儲,減輕單表壓力; 7、目前已經加入線程池異步處理資料集合,提高單次任務執行效率; 8、一旦資料庫壓力比較大的情況下,也可以根據賬号的分庫情況将Event分庫操作,減輕伺服器資料庫連接配接、IO壓力;
總結 1、通過微服務理念,實作服務分離,更加容易進行服務治理與水準擴充; 2、通過本地事務處理業務實作高性能; 3、通過P2P模式消息+Mysql+排它鎖+分布式事務+串行任務保證了事件不會重複發送、不會重複消費(可靠傳遞),并且實作了系統解耦、異步處理、流量銷峰; 4、通過對事件的生産+消費實作了最終一緻性事務;