天天看點

java多線程共享queue_Java多線程之傳統線程機制

傳統線程機制的回顧

建立線程的兩種方式

1 在Thread子類覆寫的run方法中編寫運作代碼

Thread t1 = new Thread()

{

@Override

public void run() {

System.out.println("run in thread run()");

}

}

2 在傳遞給Thread對象的Runnable對象的run方法中編寫代碼

Thread t2 = new Thread(new Runnable() {

@Override

public void run() {

System.out.println("run in runnable");

}

})

檢視Thread的run方法源碼:

@Override

public void run() {

if (target != null) {

target.run();

}

}

這個target就是構造函數中傳入的runnable對象,是以如下代碼的輸出應該是?

new Thread(new Runnable() {

@Override

public void run() {

System.out.println("run in runnable");

}

}) {

@Override

public void run() {

System.out.println("run in thread run()");

}

}.start();

答案應該是:run in thread run(),因為重寫了run方法,也就不會執行target的run方法了。

兩種建立線程的方法沒有本質差別,但第二種方式更符合面向對象的思想。

線程狀态轉換

java多線程共享queue_Java多線程之傳統線程機制

建立狀态(New):新建立了一個線程對象。

就緒狀态(Runnable):線程對象建立後,其他線程調用了該對象的start()方法。該狀态的線程位于可運作線程池中,變得可運作,等待擷取CPU的使用權。

運作狀态(Running):就緒狀态的線程擷取了CPU,執行程式代碼。

阻塞狀态(Blocked):阻塞狀态是線程因為某種原因放棄CPU使用權,暫時停止運作。直到線程進入就緒狀态,才有機會轉到運作狀态。阻塞的情況分三種:

等待阻塞:運作的線程執行wait()方法,JVM會把該線程放入等待池中。(wait會釋放持有的鎖)

同步阻塞:運作的線程在擷取對象的同步鎖時,若該同步鎖被别的線程占用,則JVM會把該線程放入鎖池中。

其他阻塞:運作的線程執行sleep()或join()方法,或者發出了I/O請求時,JVM會把該線程置為阻塞狀态。當sleep()狀态逾時、join()等待線程終止或者逾時、或者I/O處理完畢時,線程重新轉入就緒狀态。(注意,sleep是不會釋放持有的鎖)

死亡狀态(Dead):線程執行完了或者因異常退出了run()方法,該線程結束生命周期。

線程的同步互斥通信

同步與互斥

synchronized關鍵字:隐式鎖

可作用于方法和代碼塊:

作用于方法,鎖對象為this,目前執行個體;

作用于static方法,鎖對象為目前類的Class對象;

作用于代碼塊,需要指定鎖對象。

synchronized的作用:同一個鎖對象的代碼(方法和代碼塊),同一時間隻能被一個線程執行,保證可見性和原子性。

線程通信wait和notify

使用synchronized的地方不一定使用wait和notify,但使用notify和wait有兩個必要條件:

(1)wait和notify隻有在synchronized關鍵字作用範圍内才有效,否則無意義;

(2)wait和notify的調用對象必須是鎖對象,也就是synchronized目前使用的對象,否則無效,有可能跑出異常:illegalmonitorstateexception

以下為兩個例子,

例子1:使用兩個線程列印,子線程每次列印10個數字,主線程每次列印100個數字,如此往複50次:

public class TraditionalThreadCommunication {

public static void main(String[] args) {

final Business business = new Business();

new Thread(

new Runnable() {

@Override

public void run() {

for(int i=1;i<=50;i++){

business.sub(i);

}

}

}

).start();

for(int i=1;i<=50;i++){

business.main(i);

}

}

}

class Business {

private boolean bShouldSub = true;

public synchronized void sub(int i){

while(!bShouldSub){

try {

this.wait();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

for(int j=1;j<=10;j++){

System.out.println("sub thread sequence of " + j + ",loop of " + i);

}

bShouldSub = false;

this.notify();

}

public synchronized void main(int i){

while(bShouldSub){

try {

this.wait();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

for(int j=1;j<=100;j++){

System.out.println("main thread sequence of " + j + ",loop of " + i);

}

bShouldSub = true;

this.notify();

}

}

例子2 使用wait和notify實作生産者消費者阻塞隊列,隊列長度為5:

public static void main(String[] args) {

List taskQueue = new ArrayList();

int MAX_CAPACITY = 5;

Thread tProducer = new Thread(new Producer(taskQueue, MAX_CAPACITY), "Producer");

Thread tConsumer = new Thread(new Consumer(taskQueue), "Consumer");

tProducer.start();

tConsumer.start();

}

//生産者

public class Producer implements Runnable {

private final List taskQueue;

private final int MAX_CAPACITY;

public Producer(List sharedQueue, int size) {

this.taskQueue = sharedQueue;

this.MAX_CAPACITY = size;

}

@Override

public void run() {

int counter = 0;

while (true) {

try {

produce(counter++);

} catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

private void produce(int i) throws InterruptedException {

synchronized (taskQueue) {

while (taskQueue.size() == MAX_CAPACITY) {

System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + taskQueue.size());

taskQueue.wait();

}

Thread.sleep(1000);

taskQueue.add(i);

System.out.println("Produced: " + i);

taskQueue.notifyAll();

}

}

//消費者

public class Consumer implements Runnable {

private final List taskQueue;

public Consumer(List sharedQueue) {

this.taskQueue = sharedQueue;

}

@Override

public void run() {

while (true) {

try {

consume();

} catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

private void consume() throws InterruptedException {

synchronized (taskQueue) {

while (taskQueue.isEmpty()) {

System.out.println("Queue is empty " + Thread.currentThread().getName() + " is waiting , size: " + taskQueue.size());

taskQueue.wait();

}

Thread.sleep(500);

int i = (Integer) taskQueue.remove(0);

System.out.println("Consumed: " + i);

taskQueue.notify();

}

}

例子1,兩個線程會有規律的交叉列印,而例子2則在不同機器上有不同表現,依賴于cpu的排程,最關鍵的地方在于:例子1中wait的進入條件會随着每一輪的執行而改變一次,而例子2中,隻有隊列為空或滿時才會進入wait,是以沒有明顯規律。

ThreadLocal實作線程範圍的共享變量

ThreadLocal的作用和目的:用于實作線程内的資料共享,即對于相同的程式代碼,多個子產品在同一個線程中運作時要共享一份資料,而在另外線程中運作時又共享另外一份資料。

(1) 每個線程調用全局ThreadLocal對象的set方法,就相當于往其内部的map中增加一條記錄,key分别是各自的線程,value是各自的set方法傳進去的值。線上程結束時可以調用ThreadLocal.clear()方法,這樣會更快釋放記憶體,不調用也可以,因為線程結束後也可以自動釋放相關的ThreadLocal變量.

(2) 一個ThreadLocal隻能存儲一個對象,如果需要存儲多個對象,則可以用多個ThreadLocal對象,如果很多,則把這些對象封裝到一個實體類中。

(3) ThreadLocal的應用場景:如android中的Looper就存儲在ThreadLocal變量中,一個線程隻能有一個looper。

Looper.prepare()方法

private static void prepare(boolean quitAllowed) {

if (sThreadLocal.get() != null) {

throw new RuntimeException("Only one Looper may be created per thread");

}

sThreadLocal.set(new Looper(quitAllowed));

}

(4) 測試代碼

public class ThreadLocalTest {

private static ThreadLocal x = new ThreadLocal();

private static ThreadLocal myThreadScopeData = new ThreadLocal();

public static void main(String[] args) {

for(int i=0;i<2;i++){

new Thread(new Runnable(){

@Override

public void run() {

int data = new Random().nextInt();

System.out.println(Thread.currentThread().getName()

+ " has put data :" + data);

x.set(data);

MyThreadScopeData.getThreadInstance().setName("name" + data);

MyThreadScopeData.getThreadInstance().setAge(data);

new A().get();

new B().get();

}

}).start();

}

}

static class A{

public void get(){

int data = x.get();

System.out.println("A from " + Thread.currentThread().getName()

+ " get data :" + data);

MyThreadScopeData myData = MyThreadScopeData.getThreadInstance();

System.out.println("A from " + Thread.currentThread().getName()

+ " getMyData: " + myData.getName() + "," +

myData.getAge());

}

}

static class B{

public void get(){

int data = x.get();

System.out.println("B from " + Thread.currentThread().getName()

+ " get data :" + data);

MyThreadScopeData myData = MyThreadScopeData.getThreadInstance();

System.out.println("B from " + Thread.currentThread().getName()

+ " getMyData: " + myData.getName() + "," +

myData.getAge());

}

} }

class MyThreadScopeData{

private MyThreadScopeData(){}

public static MyThreadScopeData getThreadInstance(){

MyThreadScopeData instance = map.get();

if(instance == null){

instance = new MyThreadScopeData();

map.set(instance);

}

return instance;

}

//private static MyThreadScopeData instance = null;//new MyThreadScopeData();

private static ThreadLocal map = new ThreadLocal();

private String name;

private int age;

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getAge() {

return age;

}

public void setAge(int age) {

this.age = age;

}

}