先說點别的,為什麼要逐漸學會讀英文書籍
解釋一個名詞:上下文切換、Context switch
多任務系統中,上下文切換是指CPU的控制權由運作任務轉移到另外一個就緒任務時所發生的事件。
When one thread’s execution is suspended and swapped off the processor, and another thread is swapped onto the processor and its execution is resumed, this is called a context switch.
為什麼讀英文版的計算機書籍,能了解的透徹和深刻。我們使用的語言,比如Java,是用英語開發的,其JDK中的類的命名、方法的命名,都是英文的,是以,當用英文解釋一個名詞、場景時,基于對這門程式設計語言的了解,我們馬上就了解了,是非常形象的,簡直就是圖解,而用中文解釋,雖然是我們的母語,但是對于計算機語言而言,卻是外語,卻是抽象的,反而不容易了解。
推薦書籍:Java Thread Programming ,但是要注意,這本書挺古老的,JDK1.1、1.2時代的産物,是以書中的思想OK,有些代碼例子,可能得不出想示範的結果。
前言
關于多線程的知識,有非常多的資料可以參考。這裡稍微總結一下,以求加深記憶。
關于多線程在日常工作中的使用:對于大多數的日常應用系統,比如各種管理系統,可能根本不需要深入了解,僅僅知道Thread/Runnable就夠了;如果是需要很多計算任務的系統,比如推薦系統中各種中間資料的計算,對多線程的使用就較為頻繁,也需要進行一下稍微深入的研究。
幾篇實戰分析線程問題的好文章:
怎樣分析 JAVA 的 Thread Dumps
各種 Java Thread State 第一分析法則
資料庫死鎖及解決死鎖問題
全面解決五大資料庫死鎖問題
關于線程池的幾篇文章:
http://blog.csdn.net/wangpeng047/article/details/7748457
http://www.oschina.net/question/12_11255
http://jamie-wang.iteye.com/blog/1554927
基本知識
JVM最多支援多少個線程:http://www.importnew.com/10780.html
在一次真實的案例中,8G記憶體(Java應用配置設定了4G堆記憶體),4核,虛拟機,JVM 1.7,在應用down掉之前,大約開了12000個線程:http://blog.csdn.net/puma_dong/article/details/46669499
線程安全
如果你的代碼所在的程序中有多個線程在同時運作,而這些線程可能會同時運作這段代碼。
如果每次運作結果和單線程運作的結果是一樣的,而且其他的變量的值也和預期的是一樣的,就是線程安全的。
或者這樣說:一個類或者程式所提供的接口對于線程來說是原子操作或者多個線程之間的切換不會導緻該接口的執行結果存在二義性,也就是說我們不用考慮同步的問題。
或者我們這樣來簡單了解,同一段程式塊,從某一個時間點同時操作某個資料,對于這個資料來說,分叉了,則這就不是線程安全;如果對這段資料保護起來,保證順序執行,則就是線程安全。
原子操作
原子操作(atomic operation):是指不會被線程排程機制打斷的操作;這種操作一旦開始,就一直運作到結束,中間不會有任何 context switch (切換到另一個線程)。
原子操作(atomic operation):如果一個操作所處的層(layer)的更高層不能發現其内部實作與結構,則這個操作就是原子的。
原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉隻執行部分。
在多程序(線程)通路資源時,原子操作能夠確定所有其他的程序(線程)都不在同一時間内通路相同的資源。
原子操作時不需要synchronized,這是Java多線程程式設計的老生常談,但是,這是真的嗎?我們通過測試發現(return i),當對象處于不穩定狀态時,仍舊很有可能使用原子操作來通路他們,是以,對于java中的多線程,要遵循兩個原則:
a、Brian Goetz的同步規則,如果你正在寫一個變量,它可能接下來将被另一個線程讀取,或者正在讀取一個上一次已經被另一個線程寫過的變量,那麼你必須使用同步,并且,讀寫線程都必須用相同的螢幕鎖同步;
b、Brain Goetz測試:如果你可以編寫用于現代微處理器的高性能JVM,那麼就有資格去考慮是否可以避免使用同步
通常所說的原子操作包括對非long和double型的primitive進行指派,以及傳回這兩者之外的primitive。之是以要把它們排除在外是因為它們都比較大,而JVM的設計規範又沒有要求讀操作和指派操作必須是原子操作(JVM可以試着去這麼作,但并不保證)。
錯誤了解
import java.util.Hashtable;
class Test
{
public static void main(String[] args) throws Exception {
final Hashtable<String,Integer> h = new Hashtable<String,Integer>();
long l1 = System.currentTimeMillis();
for(int i=0;i<10000;i++) {
new Thread(new Runnable(){
@Override
public void run() {
h.put("test",1);
Integer i1 = h.get("test");
h.put("test",2);
Integer i2 = h.get("test");
if(i1 == i2) {
System.out.println(i1 + ":" + i2);
}
}
}).start();
}
long l2 = System.currentTimeMillis();
System.out.println((l2-l1)/1000);
}
}
有人覺得:既然Hashtable是線程安全的,那麼以上代碼的run()方法中的代碼應該是線程安全的,這是錯誤了解。
線程安全的對象,指的是其内部操作(内部方法)是線程安全的,外部對其進行的操作,如果是一個序列,還是需要自己來保證其同步的,針對以上代碼,在run方法的内部使用synchronized就可以了。
互斥鎖和自旋鎖
自旋鎖:不睡覺,循環等待擷取鎖的方式成為自旋鎖,在ConcurrentHashMap的實作中使用了自旋鎖,一般自旋鎖實作會有一個參數限定最多持續嘗試次數,超出後,自旋鎖放棄目前time slice,等下一次機會,自旋鎖比較适用于鎖使用者保持鎖時間比較短的情況。
正是由于自旋鎖使用者一般保持鎖時間非常短,是以選擇自旋而不是睡眠是非常必要的,自旋鎖的效率遠高于互斥鎖。
CAS樂觀鎖适用的場景:http://www.tuicool.com/articles/zuui6z
ThreadLocal與synchronized
差別ThreadLocal 與 synchronized
ThreadLocal是一個線程隔離(或者說是線程安全)的變量存儲的管理實體(注意:不是存儲用的),它以Java類方式表現;
synchronized是Java的一個保留字,隻是一個代碼辨別符,它依靠JVM的鎖機制來實作臨界區的函數、變量在CPU運作通路中的原子性。
兩者的性質、表現及設計初衷不同,是以沒有可比較性。
synchronized對塊使用,用的是Object對象鎖,對于方法使用,用的是this鎖,對于靜态方法使用,用的是Class對象的鎖,隻有使用同一個鎖的代碼,才是同步的。
了解ThreadLocal中提到的變量副本
事實上,我們向ThreadLocal中set的變量不是由ThreadLocal來存儲的,而是Thread線程對象自身儲存。
當使用者調用ThreadLocal對象的set(Object o)時,該方法則通過Thread.currentThread()擷取目前線程,将變量存入Thread中的一個Map内,而Map的Key就是目前的ThreadLocal執行個體。
Runnable與Thread
實作多線程,Runnable接口和Thread類是最常用的了,實作Runnable接口比繼承Thread類會更有優勢:
- 适合多個相同的程式代碼的線程去處理同一個資源
- 可以避免java中的單繼承的限制
- 增加程式的健壯性,代碼可以被多個線程共享,代碼和資料獨立。
Java線程互斥和協作
阻塞指的是暫停一個線程的執行以等待某個條件發生(如某資源就緒)。Java 提供了大量方法來支援阻塞,下面讓對它們逐一分析。
1、sleep()方法:sleep()允許指定以毫秒為機關的一段時間作為參數,它使得線程在指定的時間内進入阻塞狀态,不能得到CPU 時間,指定的時間一過,線程重新進入可執行狀态。
典型地,sleep() 被用在等待某個資源就緒的情形:測試發現條件不滿足後,讓線程阻塞一段時間後重新測試,直到條件滿足為止。
2、(Java 5已經不推薦使用,易造成死鎖!!) suspend()和resume()方法:兩個方法配套使用,suspend()使得線程進入阻塞狀态,并且不會自動恢複,必須其對應的 resume() 被調用,才能使得線程重新進入可執行狀态。典型地,suspend() 和 resume() 被用在等待另一個線程産生的結果的情形:測試發現結果還沒有産生後,讓線程阻塞,另一個線程産生了結果後,調用resume()使其恢複。
stop()方法,原用于停止線程,也已經不推薦使用,因為stop時會解鎖,可能造成不可預料的後果;推薦設定一個flag标記變量,結合interrupt()方法來讓線程終止。
3.、yield() 方法:yield() 使得線程放棄目前分得的 CPU 時間,但是不使線程阻塞,即線程仍處于可執行狀态,随時可能再次分得 CPU 時間。調用 yield() 的效果等價于排程程式認為該線程已執行了足夠的時間進而轉到另一個線程。
4.、wait() 和 notify() 方法:兩個方法配套使用,wait() 使得線程進入阻塞狀态,它有兩種形式,一種允許指定以毫秒為機關的一段時間作為參數,另一種沒有參數,前者當對應的 notify() 被調用或者超出指定時間時線程重新進入可執行狀态,後者則必須對應的 notify() 被調用。
2和4差別的核心在于,前面叙述的所有方法,阻塞時都不會釋放占用的鎖(如果占用了的話),而這一對方法則相反。上述的核心差別導緻了一系列的細節上的差別。
首先,前面叙述的所有方法都隸屬于Thread 類,但是這一對卻直接隸屬于 Object 類,也就是說,所有對象都擁有這一對方法。因為這一對方法阻塞時要釋放占用的鎖,而鎖是任何對象都具有的,調用任意對象的 wait() 方法導緻線程阻塞,并且該對象上的鎖被釋放。而調用任意對象的notify()方法則導緻因調用該對象的 wait() 方法而阻塞的線程中随機選擇的一個解除阻塞(但要等到獲得鎖後才真正可執行)。
其次,前面叙述的所有方法都可在任何位置調用,但是這一對方法卻必須在 synchronized 方法或塊中調用,理由也很簡單,隻有在synchronized 方法或塊中目前線程才占有鎖,才有鎖可以釋放。同樣的道理,調用這一對方法的對象上的鎖必須為目前線程所擁有,這樣才有鎖可以釋放。是以,這一對方法調用必須放置在這樣的 synchronized 方法或塊中,該方法或塊的上鎖對象就是調用這一對方法的對象。若不滿足這一條件,則程式雖然仍能編譯,但在運作時會出現 IllegalMonitorStateException 異常。
wait() 和 notify() 方法的上述特性決定了它們經常和synchronized 方法或塊一起使用,将它們和作業系統的程序間通信機制作一個比較就會發現它們的相似性:synchronized方法或塊提供了類似于作業系統原語的功能,它們的結合用于解決各種複雜的線程間通信問題。
關于 wait() 和 notify() 方法最後再說明三點:
第一:調用 notify() 方法導緻解除阻塞的線程是從因調用該對象的 wait() 方法而阻塞的線程中随機選取的,我們無法預料哪一個線程将會被選擇,是以程式設計時要特别小心,避免因這種不确定性而産生問題。
第二:除了 notify(),還有一個方法 notifyAll() 也可起到類似作用,唯一的差別在于,調用 notifyAll() 方法将把因調用該對象的wait()方法而阻塞的所有線程一次性全部解除阻塞。當然,隻有獲得鎖的那一個線程才能進入可執行狀态。
第三:wait/notify,是在作為螢幕鎖的對象上執行的,如果鎖是a,執行b的wait,則會報java.lang.IllegalMonitorStateException。
關于interrupted,很容易了解錯誤,看兩篇文章,如下:
http://www.blogjava.net/fhtdy2004/archive/2009/06/08/280728.html
http://www.blogjava.net/fhtdy2004/archive/2009/08/22/292181.html
示範線程間協作機制,wait/notify/condition
代碼示例1(wait/notify):
必須說這個代碼是有缺陷的,會錯失信号,想一想問題出在哪裡,應該怎麼完善?
/*
* 線程之間協作問題:兩個線程,一個列印奇數,一個列印偶數
* 在調用wait方法時,都是用while判斷條件的,而不是if,
* 在wait方法說明中,也推薦使用while,因為在某些特定的情況下,線程有可能被假喚醒,使用while會循環檢測更穩妥
* */
public class OddAndEven {
static int[] num = new int[]{1,2,3,4,5,6,7,8,9,10};
static int index = 0;
public static void main(String[] args) throws Exception {
OddAndEven oae = new OddAndEven();
//這裡如果起超過2個線程,則可能出現所有的線程都處于wait狀态的情況(網上很多代碼都有這個Bug,要注意,這其實是一個錯失信号産生的死鎖問題,如果用notifyAll就不會有這個問題)
new Thread(new ThreadOdd(oae)).start();
new Thread(new ThreadEven(oae)).start();
}
static class ThreadOdd implements Runnable {
private OddAndEven oae;
public ThreadOdd(OddAndEven oae) {
this.oae = oae;
}
@Override
public void run() {
while(index < 10) {
oae.odd();
}
}
}
static class ThreadEven implements Runnable {
private OddAndEven oae;
public ThreadEven(OddAndEven oae) {
this.oae = oae;
}
@Override
public void run() {
while(index < 10) {
oae.even();
}
}
}
//奇數
public synchronized void odd() {
while(index<10 && num[index] % 2 == 0) {
try {
wait(); //阻塞偶數
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(index >= 10) return;
System.out.println(Thread.currentThread().getName() + " 列印奇數 : " + num[index]);
index++;
notify(); //喚醒偶數線程
}
//偶數
public synchronized void even() {
while(index<10 && num[index] % 2 == 1) {
try {
wait(); //阻塞奇數
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(index >= 10) return;
System.out.println(Thread.currentThread().getName() + " 列印偶數 : " + num[index]);
index++;
notify(); //喚醒奇數線程
}
}
代碼示例2:
public class Test implements Runnable {
private String name;
private Object prev;
private Object self;
private Test(String name, Object prev, Object self) {
this.name = name;
this.prev = prev;
this.self = self;
}
@Override
public void run() {
int count = 10;
while (count > 0) {
synchronized (prev) {
synchronized (self) {
System.out.print(name);
count--;
try{
Thread.sleep(1);
}
catch (InterruptedException e){
e.printStackTrace();
}
self.notify();
}
try {
prev.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Object a = new Object();
Object b = new Object();
Object c = new Object();
Test pa = new Test("A", c, a);
Test pb = new Test("B", a, b);
Test pc = new Test("C", b, c);
new Thread(pa).start();
Thread.sleep(10);
new Thread(pb).start();
Thread.sleep(10);
new Thread(pc).start();
Thread.sleep(10);
}
}
代碼示例3(Condition實作生産者消費者模式):
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionTest test = new ConditionTest();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while(true){
lock.lock();
try {
while(queue.size() == 0){
try {
System.out.println("隊列空,等待資料");
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll(); //每次移走隊首元素
notFull.signal();
System.out.println("從隊列取走一個元素,隊列剩餘"+queue.size()+"個元素");
} finally{
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while(true){
lock.lock();
try {
while(queue.size() == queueSize){
try {
System.out.println("隊列滿,等待有空餘空間");
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1); //每次插入一個元素
notEmpty.signal();
System.out.println("向隊列取中插入一個元素,隊列剩餘空間:"+(queueSize-queue.size()));
} finally{
lock.unlock();
}
}
}
}
}
代碼示例4(兩把鎖的生産者消費者模式):
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 程式是Catch住InterruptedException,還是走Thread.interrupted(),實際是不确定的
*/
public class Restaurant {
public Meal meal;
public ExecutorService exec = Executors.newCachedThreadPool();
public WaitPerson waitPerson = new WaitPerson(this);
public Chef chef = new Chef(this);
public Restaurant() {
exec.execute(waitPerson);
exec.execute(chef);
}
public static void main(String[] args) {
new Restaurant();
}
}
class Meal {
private final int orderNum;
public Meal(int orderNum) {this.orderNum = orderNum;}
public String toString() {return "Meal " + orderNum;}
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant restaurant) {this.restaurant = restaurant;}
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal == null) {
wait();
}
}
System.out.println("WaitPerson got " + restaurant.meal);
synchronized(restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll();
}
}
}
catch(InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " " + e.toString());
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant restaurant) {this.restaurant = restaurant;}
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal != null) {
wait();
}
}
if(++count == 10) {
restaurant.exec.shutdownNow();
}
System.out.println("Order up!");
synchronized(restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
}
}
catch(InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " " + e.toString());
}
}
}
反面教材代碼示範:
class SuspendAndResume {
private final static Object object = new Object();
static class ThreadA extends Thread {
public void run() {
synchronized(object) {
System.out.println("start...");
Thread.currentThread().suspend();
System.out.println("thread end");
}
}
}
public static void main(String[] args) throws InterruptedException {
ThreadA t1 = new ThreadA();
ThreadA t2 = new ThreadA();
t1.start();
t2.start();
Thread.sleep(100);
System.out.println(t1.getState());
System.out.println(t2.getState());
t1.resume();
t2.resume();
}
}
程式輸出結果如下:
localhost:test puma$ sudo java SuspendAndResume
start...
RUNNABLE
BLOCKED
thread end
start...
關于suspend()/resume()這兩個方法,類似于wait()/notify(),但是它們不是等待和喚醒線程。suspend()後的線程處于RUNNING狀态,而不是WAITING狀态,但是線程本身在這裡已經挂起了,線程本身餓狀态就開始對不上号了。
以上的例子解釋如下:
首先t1.start()/t2.start(),main睡sleep10秒,讓兩個子線程都進入運作的區域;
列印狀态,t1運作,t2被synchronized阻塞;
t1.resume(),此時t1列印thread end,馬上執行t2.resume(),此時由于t1的synchronized還沒來得及釋放鎖,是以這段代碼是在t2的synchronized外執行的,也就是在t2.suspend()之前執行的,是以是無效的;而當t2線程被挂起時,輸出start,但是由于t2.suspend()已經被執行完了,是以t2就會一直處于挂起狀态,一直持有鎖不釋放,這些資訊的不一緻就導緻了各種資源無法釋放的問題。
對于這個程式,如果在t1.resume()和t2.resume()之間增加一個Thread.sleep(),可以看到又正常執行了。
總得來說,問題應當出線上程狀态對外看到的是RUNNING狀态,外部程式并不知道這個對象挂起了需要去做resume()操作。另外,它并不是基于對象來完成這個動作的,是以suspend()和wait()相關的順序性很難保證。是以suspend()和resume()不推薦使用了。
反過來想,這也更加說明了wait()和notify()為什麼要基于對象(而不是線程本身)來做資料結構,因為要控制生産者和消費者之間的關系,它需要一個臨界區來控制它們之間的平衡。它不是随意地線上程上做操作來控制資源的,而是由資源反過來控制線程狀态的。當然wait()和notify()并非不會導緻死鎖,隻是它們的死鎖通常是程式設計不當導緻的,并且在通常情況下是可以通過優化來解決的。
同步隊列
wait和notify以一種非常低級的方式解決了任務互操作問題,即每次互動時都握手。在許多情況下,可以瞄向更高的抽象級别,使用同步隊列來解決任務協作問題,同步隊列在任何時刻都隻允許一個操作插入或移除元素。
如果消費者任務試圖從隊列中擷取對象,而該隊列此時為空,那麼這些隊列還可以挂起消費者任務,并且當有更多的元素可用時恢複消費者任務。阻塞隊列可以解決非常大量的問題,而其方式與wait和notify相比,則簡單并可靠的多。
代碼示例1(将多個LiftOff的執行串行化,消費者是LiftOffRunner,将每個LiftOff對象從BlockingQueue中推出并直接運作):
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class TestBlockingQueues {
public static void main(String[] args) {
test("LinkedBlockingQueue",new LinkedBlockingQueue<LiftOff>()); //Unlimited size
test("ArrayBlockingQueue",new ArrayBlockingQueue<LiftOff>(3)); //Fixed size
test("SynchronousQueue",new SynchronousQueue<LiftOff>()); //Size of 1
}
private static void test(String msg,BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for(int i = 0; i < 5; i++) {
runner.add(new LiftOff(i));
}
getKey("Press 'Enter' (" + msg + ")");
t.interrupt();
System.out.println("Finished " + msg + " test");
}
private static void getKey() {
try {
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void getKey(String message) {
System.out.println(message);
getKey();
}
}
class LiftOffRunner implements Runnable {
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue) {rockets = queue;}
public void add(LiftOff lo) {
try {
rockets.put(lo);
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
@Override
public void run() {
try
{
while(!Thread.interrupted()) {
LiftOff rocket = rockets.take();
rocket.run();
}
} catch (InterruptedException e) {
System.out.println("Waking from take()");
}
System.out.print("Exiting LiftOffRunner");
}
}
class LiftOff {
private int num;
public LiftOff(int num) {this.num = num;}
public void run() {
System.out.println(Thread.currentThread().getName() + " " + num);
}
}
代碼示例2:
線程間通過BlockingQueue協作,3個任務,一個做面包,一個對面包抹黃油,一個對抹過黃油的面包抹果醬。整個代碼沒有顯示的使用同步,任務之間完成了很好地協作;因為同步由隊列(其内部是同步的)和系統的設計隐式的管理了。每片Toast在任何時刻都隻有一個任務在操作。因為隊列的阻塞,使得處理過程将被自動的挂起和恢複。
我們可以看到通過使用BlockingQueue帶來的簡化十分明顯,在使用顯式的wait/notify時存在的類和類之間的耦合被消除了,因為每個類都隻和他得BlockingQueue通信。
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue,butteredQueue));
exec.execute(new Jammer(butteredQueue,finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Toast {
public enum Status {DRY,BUTTERED,JAMMED};
private Status status = Status.DRY;
private final int id;
public Toast(int idn) {id = idn;}
public void butter() {status = Status.BUTTERED;}
public void jam() {status = Status.JAMMED;}
public Status getStatus() {return status;}
public int getId() {return id;}
public String toString() {return "Toast " + id + " : " + status;}
}
@SuppressWarnings("serial")
class ToastQueue extends LinkedBlockingQueue<Toast> {}
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) {toastQueue = tq;}
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MICROSECONDS.sleep(100 + rand.nextInt(500));
//Make toast
Toast t = new Toast(count++);
System.out.println(t);
//Insert into queue
toastQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Toaster interrupted");
}
System.out.println("Toaster off");
}
}
//Apply butter to toast
class Butterer implements Runnable {
private ToastQueue dryQueue,butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
public void run() {
try {
while(!Thread.interrupted()) {
//Blocks until next piece of toast is available
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
//Apply jam to buttered toast
class Jammer implements Runnable {
private ToastQueue butteredQueue,finishedQueue;
public Jammer(ToastQueue buttered,ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
//Blocks until next piece of toast is available
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t);;
}
} catch(InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
//Consume the toast
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
//Blocks until next piece of toast is available
Toast t = finishedQueue.take();
//Verify that the toast is coming in order, and that all pieces are getting jammed
if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error : " + t);
System.exit(1);
} else {
System.out.println("Chomp! " + t);
}
}
} catch(InterruptedException e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
任務間使用管道進行輸入輸出
Java輸入輸出類庫中的PipedWriter(允許任務向管道寫),PipedReader(允許不同任務從同一個管道中讀取),這個模型可以看成“生産者-消費者”問題的變體,這裡的管道就是一個封裝好的解決方案。
管道基本上是一個阻塞隊列,存在于多個引入BlockingQueue之前的Java版本中。
代碼示例1:
當Receiver調用read時,如果沒有更多地資料,管道将自動阻塞。
Sender和Receiver是在main中啟動的,即對象構造徹底完成之後。如果啟動了一個沒有徹底構造完畢的對象,在不同的平台上,管道可能産生不一緻的行為。相比較而言,BlockingQueue使用起來更加健壯而容易。
shutdownNow被調用時,可以看到PipedReader和普通IO之間最重要的差異,PipedReader是可以中斷的。
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
public void run() {
try {
while(true) {
for(char c = 'A'; c <= 'Z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(50));
}
}
} catch(IOException e) {
System.out.println(e + " Sender write exception");
} catch(InterruptedException e) {
System.out.println(e + " Sender sleep interrupted");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while(true) {
//Blocks until characters are there
System.out.println("Read: " + (char)in.read() + ", ");
}
} catch(IOException e) {
System.out.println(e + " Receiver read exception");
}
}
}
線程是JVM級别的
我們知道靜态變量是ClassLoader級别的,如果Web應用程式停止,這些靜态變量也會從JVM中清除。
但是線程則是JVM級别的,如果使用者在Web應用中啟動一個線程,這個線程的生命周期并不會和Web應用程式保持同步。
也就是說,即使停止了Web應用,這個線程依舊是活躍的。
正是因為這個很隐晦的問題,是以很多有經驗的開發者不太贊成在Web應用中私自啟動線程。
擷取異步線程的傳回結果
通過java.util.concurrent包種的相關類,實作異步線程傳回結果的擷取。代碼示範例子如下:
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureTest {
public static class TaskRunnable implements Runnable {
@Override
public void run() {
System.out.println("runnable");
}
}
public static class TaskCallable implements Callable<String> {
private String s;
public TaskCallable(String s) {
this.s = s;
}
@Override
public String call() throws Exception {
System.out.println("callable");
return s;
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
es.submit(new TaskRunnable());
System.out.println(i);
}
List<Future<String>> futList = new LinkedList<Future<String>>();
for (int i = 0; i < 100; i++) {
futList.add(es.submit(new TaskCallable(String.valueOf(i))));
}
for (Future<String> fut : futList) {
try {
System.out.println(fut.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
ReentrantLock和synchronized兩種鎖定機制的對比
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
public static int I;
public static Object oLock = new Object();
public static Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
long b = System.currentTimeMillis();
List<Thread> t1= new ArrayList<Thread>();
for(int j=0;j<30;j++)
{
t1.add(new Thread(new R1()));
}
for(Thread t: t1) t.start();
for(Thread t:t1) {
t.join();
}
long e = System.currentTimeMillis();
System.out.println(Test.I + " | " + (e-b));
Test.I = 0;
b = System.currentTimeMillis();
List<Thread> t2= new ArrayList<Thread>();
for(int j=0;j<30;j++)
{
t2.add(new Thread(new R2()));
}
for(Thread t: t2) t.start();
for(Thread t:t2) {
t.join();
}
e = System.currentTimeMillis();
System.out.println(Test.I + " | " + (e-b));
}
}
class R1 implements Runnable {
@Override
public void run() {
for(int i=0;i<1000000;i++)
{
Test.lock.lock();
Test.I++;
Test.lock.unlock();
}
}
}
class R2 implements Runnable {
@Override
public void run() {
for(int i=0;i<1000000;i++)
{
synchronized("") {
Test.I++;
}
}
}
}
經過測試,輸出結果分别為:
Windows7(2核,2G記憶體),結果:3000000 | 2890,和3000000 | 8198,性能差距還是比較明顯;
Mac10.7(4核,4G記憶體),結果:3億次計算,ReentrantLock用8秒,synchronized反而隻用4秒,結果反過來了;
RHEL6.1(24核,64G記憶體),結果:3億次計算,二者相差不多,都是20-50之間,但是ReentrantLock表現更好一些。
ReentrantLock利用的是“非阻塞同步算法與CAS(Compare and Swap)無鎖算法”,是CPU級别的,參考網址:
http://www.cnblogs.com/Mainz/p/3556430.html
關于兩種鎖機制的更多比較,請參閱:http://www.ibm.com/developerworks/cn/java/j-jtp10264/index.html
關于線程安全的N種實作場景
(1)synchronized
(2)immutable對象是自動線程安全的
(3)volatile,被volatile修飾的屬性,對于讀操作是線程安全的
(4)ConcurrentHashMap之類,java.util.concurrent包中的一些并發操作類,是線程安全的,但是沒有使用synchronized關鍵字,實作巧妙,利用的基本特性是:volatile、Compare And Swap、分段,對于讀不加鎖(volatile保證線程安全),對于寫,對于相關的segment通過ReentrantLock加鎖
Java線程死鎖
代碼示例:
public class Test {
public static void main(String[] args) {
Runnable t1 = new DeadLock(true);
Runnable t2 = new DeadLock(false);
new Thread(t1).start();
new Thread(t2).start();
}
}
class DeadLock implements Runnable {
private static Object lock1 = new Object();
private static Object lock2 = new Object();
private boolean flag;
public DeadLock(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
if(flag) {
synchronized(lock1) {
try {
Thread.sleep(1000); //保證晚于另一個線程鎖lock2,目的是産生死鎖
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized(lock2) {
System.out.println("flag=true:死鎖了,還能print的出來嗎?");
}
}
} else {
synchronized(lock2) {
try {
Thread.sleep(1000); //保證晚于另一個線程鎖lock1,目的是産生死鎖
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized(lock1) {
System.out.println("flag=false:死鎖了,還能print的出來嗎?");
}
}
}
}
}
死鎖可以這樣比喻:兩個人吃飯,需要刀子和叉子,其中一人拿了刀子,等待叉子;另一個人拿了叉子,等待刀子;就死鎖了。
Java線程死鎖是一個經典的多線程問題,因為不同的線程都在等待那些根本不可能被釋放的鎖,導緻所有的工作都無法完成;
導緻死鎖的根源在于不适當地運用“synchronized”關鍵詞來管理線程對特定對象的通路。
如何避免死鎖的設計規則:
(1)讓所有的線程按照同樣地順序獲得一組鎖。這種方法消除了X和Y的擁有者分别等待對方的資源的問題。這也是避免死鎖的一個通用的經驗法則是:當幾個線程都要通路共享資源A、B、C時,保證使每個線程都按照同樣的順序去通路它們,比如都先通路A,在通路B和C。
(2)将多個鎖組成一組并放到同一個鎖下。比如,把刀子和叉子,都放在一個新建立的(銀器對象)下面,要獲得子鎖,先獲得父鎖。
更多
微網誌設計架構:http://mars914.iteye.com/blog/1218492 http://timyang.net/
避免多線程時,開銷分布在排程上,可以采取的政策:減少線程到合适的程度、避免線程内IO、采用合适的優先級。
關于IO和多線程,有個案例,Redis是單線程的,支援10萬的QPS;MemberCache是多線程的,性能反而不如Redis;也可以佐證,對于IO非常多的操作,多線程未必能提高更好的性能,即使是記憶體IO。
另外,聽百分點公司的講座時,他們分享了一個案例,當計算的性能瓶頸在硬碟時,把硬碟換成SSD,可以性能翻倍,是以,應該把SSD當做是便宜的記憶體來使用,而不應該是當做昂貴的硬碟來使用。
在java.util.concurrent中,還提供了很多有用的線程寫作類,比如:
CountDownLatch:倒計時鎖、CyclicBarrier:循環栅欄、DelayQueue:延遲隊列、PriorityBlockingQueue:優先級隊列、ScheduledThreadPoolExecutor:定時任務、Semaphore:信号量、Exchanger:互動栅欄。