在資料進行中,多線程用到的場景很多,在滿足計算機CPU處理能力的情況下,使用多線程可以明顯提高程式運作效率,縮短大資料處理的能力。作為java程式開發,離不開spring,那麼在spring中怎麼建立多線程并将注冊到spring的類在多線程中使用呢?我自己總結了一下,可以有兩種方式,使用線程池和spring自帶多線程注解使用。
使用線程池
我一般使用固定線程數量的線程池,假如資料量很大,我會将資料放到一個大集合中,然後按照一定的比例配置設定數目,同時我自己寫了一個分頁類,線程的數量可以根據分頁類來自動調整。看代碼:
分頁類
package com.yiche.tag.model;
import java.util.List;
public class DataPage {
private int page = 1; // 目前頁
public int totalPages = 0; // 總頁數
private int pageRecorders;// 每頁5條資料
private int totalRows = 0; // 總資料數
private int pageStartRow = 0;// 每頁的起始數
private int pageEndRow = 0; // 每頁顯示資料的終止數
private boolean hasNextPage = false; // 是否有下一頁
private boolean hasPreviousPage = false; // 是否有前一頁
private List list;
// private Iterator it;
public DataPage(List list, int pageRecorders) {
init(list, pageRecorders);// 通過對象集,記錄總數劃分
}
/** *//**
* 初始化list,并告之該list每頁的記錄數
* @param list
* @param pageRecorders
*/
public void init(List list, int pageRecorders) {
this.pageRecorders = pageRecorders;
this.list = list;
totalRows = list.size();
// it = list.iterator();
hasPreviousPage = false;
if ((totalRows % pageRecorders) == 0) {
totalPages = totalRows / pageRecorders;
} else {
totalPages = totalRows / pageRecorders + 1;
}
if (page >= totalPages) {
hasNextPage = false;
} else {
hasNextPage = true;
}
if (totalRows < pageRecorders) {
this.pageStartRow = 0;
this.pageEndRow = totalRows;
} else {
this.pageStartRow = 0;
this.pageEndRow = pageRecorders;
}
}
// 判斷要不要分頁
public boolean isNext() {
return list.size() > 5;
}
public void setHasPreviousPage(boolean hasPreviousPage) {
this.hasPreviousPage = hasPreviousPage;
}
public String toString(int temp) {
String str = Integer.toString(temp);
return str;
}
public void description() {
String description = "共有資料數:" + this.getTotalRows() +
"共有頁數: " + this.getTotalPages() +
"目前頁數為:" + this.getPage() +
" 是否有前一頁: " + this.isHasPreviousPage() +
" 是否有下一頁:" + this.isHasNextPage() +
" 開始行數:" + this.getPageStartRow() +
" 終止行數:" + this.getPageEndRow();
System.out.println(description);
}
public List getNextPage() {
page = page + 1;
disposePage();
System.out.println("使用者凋用的是第" + page + "頁");
this.description();
return getObjects(page);
}
/** *//**
* 處理分頁
*/
private void disposePage() {
if (page == 0) {
page = 1;
}
if ((page - 1) > 0) {
hasPreviousPage = true;
} else {
hasPreviousPage = false;
}
if (page >= totalPages) {
hasNextPage = false;
} else {
hasNextPage = true;
}
}
public List getPreviousPage() {
page = page - 1;
if ((page - 1) > 0) {
hasPreviousPage = true;
} else {
hasPreviousPage = false;
}
if (page >= totalPages) {
hasNextPage = false;
} else {
hasNextPage = true;
}
this.description();
return getObjects(page);
}
/** *//**
* 擷取第幾頁的内容
*
* @param page 從1開始
* @return
*/
public List getObjects(int page) {
if (page == 0)
this.setPage(1);
else
this.setPage(page);
this.disposePage();
if (page * pageRecorders < totalRows) {// 判斷是否為最後一頁
pageEndRow = page * pageRecorders;
pageStartRow = pageEndRow - pageRecorders;
} else {
pageEndRow = totalRows;
pageStartRow = pageRecorders * (totalPages - 1);
}
List objects = null;
if (!list.isEmpty()) {
objects = list.subList(pageStartRow, pageEndRow);
}
//this.description();
return objects;
}
public List getFistPage() {
if (this.isNext()) {
return list.subList(0, pageRecorders);
} else {
return list;
}
}
public boolean isHasNextPage() {
return hasNextPage;
}
public void setHasNextPage(boolean hasNextPage) {
this.hasNextPage = hasNextPage;
}
public List getList() {
return list;
}
public void setList(List list) {
this.list = list;
}
public int getPage() {
return page;
}
public void setPage(int page) {
this.page = page;
}
public int getPageEndRow() {
return pageEndRow;
}
public void setPageEndRow(int pageEndRow) {
this.pageEndRow = pageEndRow;
}
public int getPageRecorders() {
return pageRecorders;
}
public void setPageRecorders(int pageRecorders) {
this.pageRecorders = pageRecorders;
}
public int getPageStartRow() {
return pageStartRow;
}
public void setPageStartRow(int pageStartRow) {
this.pageStartRow = pageStartRow;
}
public int getTotalPages() {
return totalPages;
}
public void setTotalPages(int totalPages) {
this.totalPages = totalPages;
}
public int getTotalRows() {
return totalRows;
}
public void setTotalRows(int totalRows) {
this.totalRows = totalRows;
}
public boolean isHasPreviousPage() {
return hasPreviousPage;
}
}
複制
這個分頁類可以多個項目複用,既可以實作分頁,還可以用到線程池中給線程配置設定數量使用。以下代碼為如何使用該分頁類給線程池使用的。
List<Integer> carModelIds = carsTagService.getAllCarModelIds();
DataPage dataPage = new DataPage(carModelIds, 300);
ExecutorService executorService = Executors.newFixedThreadPool(dataPage.totalPages);
for (int i = 0; i < dataPage.totalPages; i++) {
final int index = i + 1;
executorService.execute(new Runnable() {
List<Integer> temp = dataPage.getObjects(index);
@Override
public void run() {
try {
if (temp != null && temp.size() > 0) {
temp=Collections.synchronizedList(temp);
int sum = 0;
for (Integer carId : temp) {
try {
carsTagService.insertCarTagByCarId(carId);
sum++;
if (sum % 100 == 0) {
System.out.println("線程" + index + "目前執行了" + sum + "條,目前進度是" + sum * 100/ temp.size() + "%");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("線程" + index + "執行CarTagInitTask任務成功:" + temp.size());
} else {
System.out.println("CarTagInitTask任務執行完成:未讀取到資料!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
while (true) {
if (executorService.isTerminated()) {
System.out.println("所有的子線程都結束了!");
break;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複制
上述代碼中carModelIds就是要處理的資料集合,數目很大,如果直接for循環将會消耗很多的時間,利用線程池可以解決這個問題。但是如果直接建立多線程,線程中使用的對象需要final修飾,這對于spring管理的類不适用。使用線程池可以解決這個問題。
使用springboot自帶@Async注解建立異步線程
在springboot中,可以使用@Async注解來将一個方法設定為異步方法,調用該方法的時候,是新開一個線程去調用。代碼如下:
@Component
public class Task {
@Async
public void doTaskOne() throws Exception {
System.out.println("開始做任務一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任務一,耗時:" + (end - start) + "毫秒");
}
@Async
public void doTaskTwo() throws Exception {
System.out.println("開始做任務二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任務二,耗時:" + (end - start) + "毫秒");
}
@Async
public void doTaskThree() throws Exception {
System.out.println("開始做任務三");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任務三,耗時:" + (end - start) + "毫秒");
}
}
複制
為了讓@Async注解能夠生效,還需要在Spring Boot的主程式中配置@EnableAsync,如下所示:
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
複制
此時可以反複執行單元測試,您可能會遇到各種不同的結果,比如:
- 沒有任何任務相關的輸出
- 有部分任務相關的輸出
- 亂序的任務相關的輸出
原因是目前doTaskOne、doTaskTwo、doTaskThree三個函數的時候已經是異步執行了。主程式在異步調用之後,主程式并不會理會這三個函數是否執行完成了,由于沒有其他需要執行的内容,是以程式就自動結束了,導緻了不完整或是沒有輸出任務相關内容的情況。
注: @Async所修飾的函數不要定義為static類型,這樣異步調用不會生效
異步回調
為了讓doTaskOne、doTaskTwo、doTaskThree能正常結束,假設我們需要統計一下三個任務并發執行共耗時多少,這就需要等到上述三個函數都完成調動之後記錄時間,并計算結果。
那麼我們如何判斷上述三個異步調用是否已經執行完成呢?我們需要使用Future<T>來傳回異步調用的結果,就像如下方式改造doTaskOne函數:
@Async
public Future<String> doTaskOne() throws Exception {
System.out.println("開始做任務一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任務一,耗時:" + (end - start) + "毫秒");
return new AsyncResult<>("任務一完成");
}
複制
按照如上方式改造一下其他兩個異步函數之後,下面我們改造一下測試用例,讓測試在等待完成三個異步調用之後來做一些其他事情
@Test
public void test() throws Exception {
long start = System.currentTimeMillis();
Future<String> task1 = task.doTaskOne();
Future<String> task2 = task.doTaskTwo();
Future<String> task3 = task.doTaskThree();
while(true) {
if(task1.isDone() && task2.isDone() && task3.isDone()) {
// 三個任務都調用完成,退出循環等待
break;
}
Thread.sleep(1000);
}
long end = System.currentTimeMillis();
System.out.println("任務全部完成,總耗時:" + (end - start) + "毫秒");
}
複制
看看我們做了哪些改變:
在測試用例一開始記錄開始時間
在調用三個異步函數的時候,傳回Future<String>類型的結果對象
在調用完三個異步函數之後,開啟一個循環,根據傳回的Future<String>對象來判斷三個異步函數是否都結束了。若都結束,就結束循環;若沒有都結束,就等1秒後再判斷。
跳出循環之後,根據結束時間 - 開始時間,計算出三個任務并發執行的總耗時。
執行一下上述的單元測試,可以看到如下結果:
開始做任務一
開始做任務二
開始做任務三
完成任務三,耗時:37毫秒
完成任務二,耗時:3661毫秒
完成任務一,耗時:7149毫秒
任務全部完成,總耗時:8025毫秒
需要注意,spring管理的異步線程數量有限,如果是web項目的話,線程數量由tomcat的線程池配置有關系,是以如果可以,最好自己配置線程配置類。
/**
* springboot裡面建立異步線程配置類
* @author kouyy
*/
@Configuration
@EnableAsync
public class ThreadAsyncConfigurer implements AsyncConfigurer {
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//設定核心線程數
threadPool.setCorePoolSize(10);
//設定最大線程數
threadPool.setMaxPoolSize(100);
//線程池所使用的緩沖隊列
threadPool.setQueueCapacity(10);
//等待任務在關機時完成--表明等待所有線程執行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待時間 (預設為0,此時立即停止),并沒等待xx秒後強制停止
threadPool.setAwaitTerminationSeconds(60);
// 線程名稱字首
threadPool.setThreadNamePrefix("MyAsync-");
// 初始化線程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
複制
配置類建立之後,以後再使用@Async建立異步線程就可以按照自己配置來使用了。不用擔心和spring線程池數量沖突了。