天天看點

Thread專題(5) - 任務執行

此文被筆者收錄在系列文章 ​​​架構師必備(系列)​​ 中

所謂任務就是抽象、離散的工作單元。把一個應用程式的工作分離到任務中,可以簡化程式的管理,這種分離還在不同僚務間劃分了自然的分界線,在程式出現錯誤時可以很友善地進行恢複,還有利于提高程式的并發性。

圍繞任務執行來管理應用程式時,第一步要指明一個清晰的任務邊界,理想情況下,任務是獨立活動的,它的工作并不依賴于其他任務的狀态、結果或邊界效應,獨立有利于并發性。應用程式應該在負荷過載時平緩地劣化,而不應該負載一高就簡單地以失敗告終。為了達到這些目的,要選擇一個清晰的任務邊界,并配合一個明确的任務執行政策。現在的很多伺服器都選擇了自然邊界,比如單個客戶請求。

一、JAVA中的線程排程

搶占式

搶占式排程指的是每條線程執行的時間、線程的切換都由系統控制,系統控制指的是在系統某種運作機制下,可能每條線程都分同樣的執行時間片,也可能是某些線程執行的時間片較長,甚至某些線程得不到執行的時間片。在這種機制下,一個線程的堵塞不會導緻整個程序堵塞。這是JAVA中的實作方案,相關算法如下:

  • 優先排程算法:先來先服務算法FCFS和短作業優先排程算法,公平機制
  • 高優先權優先算法:非公平,用于非實時應用中
  • 時間片輪詢算法:

協同式

協同式排程指某一線程執行完後主動通知系統切換到另一線程上執行,這種模式就像接力賽一樣,一個人跑完自己的路程就把接力棒交接給下一個人,下個人繼續往下跑。線程的執行時間由線程本身控制,線程切換可以預知,不存在多線程同步問題,但它有一個緻命弱點:如果一個線程編寫有問題,運作到一半就一直堵塞,那麼可能導緻整個系統崩潰。

顯式的為任務建立線程

下面這個例子稱為“每任務每線程”,這種方法提高了響應性和更大的吞吐量,但在請求的速度超出伺服器的請求處理能力時,就不适合了。在開發原型階段會表現良好。

public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            //accept是一個阻塞操作,so隻有在有連接配接的時候才會往下執行。
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }};
            new Thread(task).start();
        }}      

“每任務每線程”的程式缺點如下:

  • 線程生命周期的開銷,如果請求是頻繁且輕量級的,就會消耗大量的計算資源。
  • 資源消耗量,主要是針對記憶體。如果可運作的線程數多于可用的處理器數,線程将會空閑。大量空閑線程占用更多的記憶體,給GC帶來壓力,而且會存在大量線程在競争CPU資源,還會産生其他的性能開銷。如果有足夠多的線程保持所有CPU忙碌,那麼再建立更多的線程是百害無一利的。
  • 穩定性,應該限制可建立線程的數目,這個數目受OS、JVM啟動參數、Thread的構造函數中請求的棧大小等因素影響。如果打破了這些規則可能會收到OutOfMemoryError錯誤。(在32位的機器上,主要的限制因素是線程棧的位址空間,每個線程都維護着兩個執行棧,一個用于java代碼,另一個用于原生代碼,典型的JVM預設會産生一個組合的棧,大小在0.5M大小左右,可以通過-Xss JVM參數或者通過Thread的構造函數修改這個值。如果為每個線程配置設定了大小232位元組的棧,那麼你的線程數量将被限制在幾千到幾萬間不等)。

這種實作方式隻能當作練習,不能應用到正式的環境中去。

二、Executor架構

線程是使任務異步執行的機制,作為Executor架構的一部分,它是基于生産—消費模式設計,java.util.concurrent提供了一個靈活的線程池實作。

Executor很簡單但可用于異步任務執行,支援不同類型的任務執行政策,還為任務送出和任務執行之間的解耦提供了标準的方法,還提供了對生命周期的支援以及鈎子函數等。在生産—消費模式中,送出任務是執行者是生産者,執行任務的線程是消費者。

Executor接口的實作相當于一個模闆或抽象模闆實作,是把Runnable委托給Executor來執行。是以可以定義自己的Executor實作類。

//建立一定長的線程池,這個配置是一次性的 
  private static final Executor exec =  Executors.newFixedThreadPool(100);
  public void task(){
    while (true){
      Runnable task  = new Runnable(){
        @Override
        public void run() {
          // TODO Auto-generated method stub
        }
      };
      exec.execute(task);
    }//end while
  }      
//每任務每線程的Executor實作
class TaskPerThreadExecutor implements Executor{
  @Override
  public void execute(Runnable command) {
    new Thread(command).start();
  }
}      
//單線程的Executor實作,這時的 TaskSingleThreadExecutor相當于一個委托類
class TaskSingleThreadExecutor implements Executor{
  @Override
  public void execute(Runnable command) {
    command.run();
  }
}      

執行政策

将任務的送出與任務的執行進行解耦,價值在于讓你可以簡單地為一個類給定的任務制定執行政策,并且保證後續的修改不至于太困難,執行政策指明了:

  • 任務在什麼線程中執行;
  • 任務以什麼順序執行(FIFO、LIFO、優先級);
  • 可以有多少個任務并發執行;
  • 可以有多少個任務進入等待執行隊列;
  • 如果系統過載,需要放棄哪個任務并且如何通知Application知道這一切;
  • 在一個任務的執行前與後,應該插入什麼處理操作。

執行政策是資源管理工具,最佳政策取決于可用的計算資源和你對服務品質的需求,将任務的送出與執行進行分離,有助于在部署階段選擇一個與目前硬體最比對的執行政策。是以以後的程式中盡量少用或不用new Thread(runnable).start()這種方式而改用Executor委托執行。

線程池

線程池是與工作隊列(作用是持有所有等待執行的任務的容器)緊密綁定的,使用線程池可以避免每次建立銷毀線程的性能開銷,而重用線程池中已有的線程。

Executors中有很多靜态工廠方法用來建立一個線程池:

  • newFixedThreadPool:建立一個定長的線程池,直到達到最大長度,如果一個線程由于非預期的Exception而結束,線程池會補充一個新的線程。
  • newCacheThreadPool:建立一個可緩存的線程池,它不會對池的大小有任何限制,如果目前線程池的長度超過了處理的需要時,它可以靈活地回收空閑的線程,當需求增加時,它可以添加新的線程,前兩種線程池都有可能會造成記憶體耗盡。如果池裡有未被回收的線程,是可被重用的,回收時間是60秒;
  • newSingleThreadExecutor:建立一個單線程化的executor,它隻建立唯一的工作線程來執行任務,如果這個線程異常結束,會有另一個取代它,executor會保證任務依照任務隊列的規定執行。但和順序程式設計存在很大差别,它還是脫離主線程以單獨線程的方式執行,隻不過一次執行一個。
  • newScheduledThreadPool:建立一個定長的線程池,而且支援定時的以及周期性的任務執行,類似于Timer。

使用了Executor架構可以有各種機制去調優、管理、監視、記錄日志、錯誤報告以及其他可能的事情,如果不用Executor架構這些事情很難去做。

           如果線程到達的時間大于任務執行的速度,”每任務每線程”的政策還是會由于積壓導緻記憶體耗盡,使用一個有限的工作隊列,可以很好的解決這個問題。

Executor生命周期

Executor實作通常隻是為執行任務而建立線程,JVM會在所有線程全部終止後才通出,是以無法正确關閉Executor,進而會阻塞JVM的結束。

因為Executor是異步地執行任務,是以有很多不确定因素,為了解決執行服務的生命周期問題,提供了ExecutorService接口,它擴充了Executor接口同時添加了一些用于生命周期(運作、關閉、終止)管理的方法。

public interface ExecutorService extends Executor {
    //不在接收新的任務後平緩關閉
    void shutdown();
    //強制停止正在運作和隊列中等待的任務,并傳回清單以便序列化等操作,下次啟動時恢複
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    //轉換ExecutorService的狀态,同時調用shutdown()方法 
    boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;
}      
private final ExecutorService exec = Executors.newCachedThreadPool();
    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exec.isShutdown()) {
            try {
                final Socket conn = socket.accept();
                exec.execute(new Runnable() {
                    public void run() {
                        handleRequest(conn);
                    }
                }); //使用時隻是簡單換個類就行了
            } catch (RejectedExecutionException e) {
                if (!exec.isShutdown())
                    log("task submission rejected", e);
            }
        }
    }
    public void stop() {
        exec.shutdown();
    }
    private void log(String msg, Exception e) {
        Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
    }
    void handleRequest(Socket connection) {}      

一旦所有的任務全部完成後ExecutorService會轉入終止狀态,可以調用awaitTermination等待ExecutorService到達終止狀态,也可以輪詢isTerminated判斷ExecutorService是否已經終止。通常shutdown會緊随awaitTermination之後,這樣可以産生同步關閉ExecutorService的效果。

延遲的、并具周期性的任務

Timer工具管理任務的延遲以及周期執行,但是它有一些問題:1、它隻建立唯一的線程來執行所有的timer任務,如果一個timer任務執行很耗時,可能會導緻其它TimerTask的時效準确性出問題,不加處理可能會造成任務丢失的情況;2、如果TimerTask抛出未檢查的異常,Timer将會産生無法預料的行為。Timer線程并不捕獲異常,這時會造成Timer線程的終止,并且不會自動恢複,會錯誤的認為整個Timer都被取消了(線程洩露);3、另外Timer的排程是基于系統時間的。

是以現在應該用ScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor隻支援相對時間)來取代Timer。并且不在使用Timer類。這得益于DelayQueue。如果需要建構自己的排程服務,可以使用DelayQueue,它是BlockingQueue的實作,為ScheduledThreadPoolExecutor提供了排程功能,DelayQueue管理着一個包含Delayed對象的容器。每個Delayed對象都與一個延遲時間相關聯,隻有在元素過期後,DelayQueue才讓你執行take操作擷取元素。從DelayQueue中傳回的對象将依據它們所延遲的時間進行排序。以下是有問題的一個例子:Timer的混亂行為:

public class OutOfTime {
    public static void main(String[] args) throws Exception {
        Timer timer = new Timer();
        timer.schedule(new ThrowTask(), 1);
        SECONDS.sleep(1);
        timer.schedule(new ThrowTask(), 1);
        SECONDS.sleep(5);
    }

    static class ThrowTask extends TimerTask {
        public void run() {
            throw new RuntimeException();
        }
    }
}      

三、尋找可強化的并行性

​對于并行性的任務,要考慮的一個很重要的因素是任務邊界問題,盡量做到任務之間的松耦合或解耦。這個問題很難下一個指導性的結論,需要在設計時權衡考慮,下面的幾個例子從需求角度設計了幾個程式的不同并發性,IE渲染HTML的例子:

順序的渲染頁面元素

​這種方式基本是行處理的方式,頁面等待時間可能會很長。效率比較低下。

public abstract class SingleThreadRenderer {
    void renderPage(CharSequence source) {
        renderText(source);
        List<ImageData> imageData = new ArrayList<ImageData>();
        for (ImageInfo imageInfo : scanForImageInfo(source))
            imageData.add(imageInfo.downloadImage());
        for (ImageData data : imageData)
            renderImage(data);
    }
}      

可攜帶結果的任務:Callable和Future

​Executor架構使用Runnable做為其任務的基本表達形式,但Runnable的功能很有限,它不能傳回一個值或抛出受檢查的異常。

  • Callable主要用于計算時間長的任務,比如下載下傳、複雜運算、資料庫操作。它會在主進入點call--等待傳回值,并為可能抛出的異常預告做好了準備,Executors包含一些工具方法,可以把其他類型的任務封裝成一個Callable。Callable<void>===Runnable。

在Executor架構中,如果任務執行要花費很長時間,任務可以手動取消,但對于已經開始的任務,隻有中斷後才可以取消,取消一個已完成的任務不會有任何影響。

  • Future可以描述任務的生命周期,并提供了相關的方法來獲得任務的結果、取消任務以及檢驗任務是否完成。Future和ExecutorService這些線程相關的對象的生命周期是單向的,無法回退。Future的get方法是個阻塞方法, 注意get處理異常的能力,它會把異常重新包裝後再抛出。

ExecutorService的所有submit()方法都傳回一個Future。可以将一個Runnable或Callable送出給executor,然後得到一個Future。也可以顯示的為Runnable或Callable建立一個FutureTask。FutureTask實作了Runnable接口。是以可以直接交給ExecutorService來執行,也可以直接調用run方法。

在java6中,ExecutorService的所有實作都可以覆寫AbstractExecutorService中的newTaskFor方法,以此控制Future的執行個體化,以及對應的已送出的Runnable和Callable。

private final ExecutorService executor = Executors.newCachedThreadPool();

    void renderPage(CharSequence source) {
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        Callable<List<ImageData>> task =
                new Callable<List<ImageData>>() {
                    public List<ImageData> call() {
                        List<ImageData> result = new 
                                   ArrayList<ImageData>();
                        for (ImageInfo imageInfo : imageInfos)
                            result.add(imageInfo.downloadImage());
                        return result;
                    }
                };

        Future<List<ImageData>> future = executor.submit(task);
//    FutureTask future = new FutureTask(task);
//    executor.submit(future);//這兩行代碼和上面一行代碼是等價的。

        try {
            List<ImageData> imageData = future.get();
            for (ImageData data : imageData)
                renderImage(data); //渲染圖檔
        } catch (InterruptedException e) {
            // 重新聲明線程的中斷狀态
            Thread.currentThread().interrupt();
            // 不需要結果 ,故取消任務
            future.cancel(true);//異常處理
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }      

并行運作類任務的局限性

上例的程式通過兩個異步的任務期望獲得性能上的提升。如果沒有在相似的任務之間發現更好的并行性,那麼并行運作方法應有的好處會逐漸減少。更為嚴重的是,在給多個工作者劃分相異任務時,各個任務的大小可能完全不同,比如A任務執行時間是B的10倍,那麼整個過程僅僅加速了9%左右。

在多個工作者之間劃分任務,總會涉及到一些任務協調上的開銷,為了使劃分任務是值得的,這一開銷不能多于通過并行性帶來的生産力的提高。上例中如果下載下傳圖像的速度遠遠小于渲染文本的速度,不如用順序執行程式,這樣還能減小代碼的複雜度。是以合理的并行任務需要:

大量互相獨立且同類的任務進行并發處理,會将程式的任務量配置設定到不同的任務中,這樣才能真正獲得性能的提升。

CompletionService:當Executor遇見BlockingQueue

如果向Executor送出了一個批處理任務,并且希望在它們完成後獲得結果,為此可以儲存與每個任務相關聯的Future,然後不斷地調用timeout為0的get,來檢驗Future是否完成。這樣做行的通,但可以用CompletionService(完成服務)代替。

CompletionService整合了Executor和BlockQueue的功能,可以将Callable任務送出給它去執行,然後使用類似于隊列中的take和poll方法,在結果完整可用時獲得這個結果,像一個打包的Future。ExecutorCompletionService是實作CompletionService接口的一個類,并将計算任務委托給一個Executor。

ExecutorCompletionService在構造函數中建立一個BlockingQueue,用它去儲存完成的結果。計算完成時會調用FutureTask中的done方法,當送出一個任務後,首先把這個任務包裝為一個QueueFuture,它是FutureTask的一個子類,然後覆寫done方法,将結果加入到BlockingQueue中,take和poll方法委托給了BlockingQueue,它會在結果不可用時阻塞。

下面這個例子的性能會比前一個例子的性能高,因為算法不同,這裡的實作是每需要下載下傳一個圖,就建立一個獨立的任務,并線上程池中執行它們,将上面例子中的順序下載下傳過程轉化為并行的,這能減少下載下傳所有圖像的總時間。從CompletionService中擷取結果,隻要任何一個圖像下載下傳完成,就立刻呈現。

public abstract class Renderer {
    private final ExecutorService executor;

    Renderer(ExecutorService executor) {
        this.executor = executor;
    }

    void renderPage(CharSequence source) {
        final List<ImageInfo> info = scanForImageInfo(source);
        CompletionService<ImageData> completionService =
                new ExecutorCompletionService<ImageData>(executor);
        for (final ImageInfo imageInfo : info)
            completionService.submit(new Callable<ImageData>() {
                public ImageData call() {
                    return imageInfo.downloadImage();
                }
            });

        renderText(source);

        try {
            for (int t = 0, n = info.size(); t < n; t++) {
                Future<ImageData> f = completionService.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }      

為任務設定時限

在預定時間内執行任務的主要挑戰:

  • 要確定在得到答案或者發現無法從任務中獲得結果的這一過程所花費的時間,不會比預定的時間更長,Future.get的限時版本符合這個條件,它在結果準備好後立即傳回,如果在時限内沒有準備好,就會抛出TimeoutException。
  • 當它們逾時後應該可以停止它們,為了達到這個目的,可以讓任務自己管理它的預定時間,逾時後就中止執行或取消任務。Future可以完成這樣的功能,在TimeoutException中處理。
Page renderPageWithAd() throws InterruptedException {
    long endNanos = System.nanoTime() + TIME_BUDGET;
    Future<Ad> f = exec.submit(new FetchAdTask());
    // 等待廣告呈現頁面
    Page page = renderPageBody();
    Ad ad;
    try {
        // 在預定時間内等待
        long timeLeft = endNanos - System.nanoTime();
        ad = f.get(timeLeft, NANOSECONDS);
    } catch (ExecutionException e) {
        ad = DEFAULT_AD;
    } catch (TimeoutException e) {
        ad = DEFAULT_AD;
        f.cancel(true);
    }
    page.setAd(ad);
    return page;
}      
import java.util.concurrent.*;

public class TimeBudget {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo,  
                                      Set<TravelCompany> companies,
                                      Comparator<TravelQuote> ranking, 
                                     long time, TimeUnit unit){
        List<QuoteTask> tasks = new ArrayList<QuoteTask>();
        for (TravelCompany company : companies)
            tasks.add(new QuoteTask(company, travelInfo));

        List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);

        List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
        Iterator<QuoteTask> taskIter = tasks.iterator();
        for (Future<TravelQuote> f : futures) {
            QuoteTask task = taskIter.next();
            try {
                quotes.add(f.get());
            } catch (ExecutionException e) {
                quotes.add(task.getFailureQuote(e.getCause()));
            } catch (CancellationException e) {
                quotes.add(task.getTimeoutQuote(e));
            }
        }

        Collections.sort(quotes, ranking);
        return quotes;
    }

}

class QuoteTask implements Callable<TravelQuote> {
    private final TravelCompany company;
    private final TravelInfo travelInfo;

    public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
        this.company = company;
        this.travelInfo = travelInfo;
    }

    TravelQuote getFailureQuote(Throwable t) {
        return null;
    }

    TravelQuote getTimeoutQuote(CancellationException e) {
        return null;
    }

    public TravelQuote call() throws Exception {
        return company.solicitQuote(travelInfo);
    }
}

interface TravelCompany {
    TravelQuote solicitQuote(TravelInfo travelInfo) throws Exception;
}

interface TravelQuote {
}

interface TravelInfo {
}