天天看點

生産者消費者模型的正确姿勢

簡介:

生産者、消費者模型是多線程程式設計的常見問題,最簡單的一個生産者、一個消費者線程模型大多數人都能夠寫出來,但是一旦條件發生變化,我們就很容易掉進多線程的bug中。這篇文章主要講解了生産者和消費者的數量,商品緩存位置數量,商品數量等多個條件的不同組合下,寫出正确的生産者消費者模型的方法。

歡迎探讨,如有錯誤敬請指正

如需轉載,請注明出處 http://www.cnblogs.com/nullzx/

定義商品類

package demo;

/*定義商品*/
public class Goods {
	public final String name;
	public final int price;
	public final int id;
	
	public Goods(String name, int price, int id){
		this.name = name; /*類型*/
		this.price = price; /*價格*/
		this.id = id; /*商品序列号*/
	}
	
	@Override
	public String toString(){
		return "name: " + name + ",   price:"+ price + ",   id: " + id; 
	}
}
      

基本要求:

1)生産者不能重複生産一個商品,也就是說不能有兩個id相同的商品

2)生産者不能覆寫一個商品(目前商品還未被消費,就被下一個新商品覆寫)。也就是說消費商品時,商品的id屬性可以不連續,但不能出現缺号的情況

3)消費者不能重複消費一個商品

1. 生産者線程無限生産,消費者線程無限消費 的模式

1.1使用線程對象,一個生産者線程,一個消費者線程,一個商品存儲位置

package demo;

import java.util.Random;

/*使用線程對象,一個緩存位置,一個生産者,一個消費者,無限生産商品消費商品*/
public class ProducterComsumerDemo1 {
	
	/*定義一個商品緩存位置*/
	private volatile Goods goods;
	
	/*定義一個對象作為鎖,不使用goods作為鎖是因為生産者每次會産生一個新的對象*/
	private Object obj = new Object();
	
	
	/*isFull == true 生産者線程休息,消費者線程消費 
	 *isFull == false 消費者線程休息,生産者線程生産*/
	private volatile boolean isFull = false;
	
	/*商品的id編号,生産者制造的每個商品的id都不一樣,每生産一個id自增1*/
	private int id = 1;
	
	/*随機産生一個sleep時間*/
	private Random rnd = new Random();
	
	/*=================定義消費者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){
			
			try{
				while(true){
					
					/*擷取obj對象的鎖, id 和 isFull 的操作都在同步代碼塊中*/
					synchronized(obj){
						
						if(!isFull){
							/*wait方法使目前線程阻塞,并釋放鎖*/
							obj.wait();
						}
							
						/*随機延時一段時間*/
						Thread.sleep(rnd.nextInt(250));
						
						/*模拟消費商品*/
						System.out.println(goods);
						
						/*随機延時一段時間*/
						Thread.sleep(rnd.nextInt(250));
						
						isFull = false;
						
						/*喚醒阻塞obj上的生産者線程*/
						obj.notify();
						
					}
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(250));
					
				}
			}catch (InterruptedException e){
				/*什麼都不做*/
			}
		}
	}
	
/*=================定義生産者線程==================*/
	public class ProductThread implements Runnable{
		@Override
		public void run(){
			
			try {
				while(true){
					
					synchronized(obj){
						
						if(isFull){
							obj.wait();
						}
							
						Thread.sleep(rnd.nextInt(500));
						
						/*如果id為偶數,生産價格為2的産品A
						 *如果id為奇數,生産價格為1的産品B*/
						if(id % 2 == 0){
							goods = new Goods("A", 2, id);
						}else{
							goods = new Goods("B", 1, id);
						}
						
						Thread.sleep(rnd.nextInt(250));
						
						id++;
						isFull = true;
						
						/*喚醒阻塞的消費者線程*/
						obj.notify();
					}
				}
			} catch (InterruptedException e) {
				/*什麼都不做*/
			}
			
		}
	}
	
	public static void main(String[] args) throws InterruptedException{
		
		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
		
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		
		new Thread(p).start();
		new Thread(c).start();
	}
}
      

運作結果

name: B,   price:1,   id: 1
name: A,   price:2,   id: 2
name: B,   price:1,   id: 3
name: A,   price:2,   id: 4
name: B,   price:1,   id: 5
name: A,   price:2,   id: 6
name: B,   price:1,   id: 7
name: A,   price:2,   id: 8
name: B,   price:1,   id: 9
name: A,   price:2,   id: 10
name: B,   price:1,   id: 11
name: A,   price:2,   id: 12
name: B,   price:1,   id: 13
……
      

從結果看出,商品類型交替生産,每個商品的id都不相同,且不會漏過任何一個id,生産者沒有重複生産,消費者沒有重複消費,結果完全正确。

1.2. 使用線程對象,多個生産者線程,多個消費者線程,1個緩存位置

1.2.1一個經典的bug

對于多生産者,多消費者這個問題,看起來我們似乎不用修改代碼,隻需在main方法中多添加幾個線程就好。假設我們需要三個消費者,一個生産者,那麼我們隻需要在main方法中再添加兩個消費者線程。

public static void main(String[] args) throws InterruptedException{
		
		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
		
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		
		new Thread(c).start();

		new Thread(p).start();

		new Thread(c).start();
		new Thread(c).start();

	}

      
name: B,   price:1,   id: 1
name: A,   price:2,   id: 2
name: A,   price:2,   id: 2
name: B,   price:1,   id: 3
name: B,   price:1,   id: 3
name: A,   price:2,   id: 4
name: A,   price:2,   id: 4
name: B,   price:1,   id: 5
name: B,   price:1,   id: 5
name: A,   price:2,   id: 6
……
      

從結果中,我們發現消費者重複消費了商品,是以這樣做顯然是錯誤的。這裡我們定義多個消費者,一個生産者,是以遇到了重複消費的問題,如果定義成一個消費者,多個生産者就會遇到id覆寫的問題。如果我們定義多個消費者,多個生産者,那麼即會遇到重複消費,也會遇到id覆寫的問題。注意,上面的代碼使用的notifyAll喚醒方法,如果使用notify方法喚醒bug仍然可能發生。

現在我們來分析一下原因。當生産者生産好了商品,會喚醒因沒有商品而阻塞消費者線程,假設喚醒的消費者線程超過兩個,這兩個線程會競争擷取鎖,擷取到鎖的線程就會從obj.wait()方法中傳回,然後消費商品,并把isFull置為false,然後釋放鎖。當被喚醒的另一個線程競争擷取到鎖了以後也會從obj.wait()方法中傳回。會再次消費同一個商品。顯然,每一個被喚醒的線程應該再次檢查isFull這個條件。是以無論是消費者,還是生産者,isFull的判斷必須改成while循環,這樣才能得到正确的結果而不受生産者的線程數和消費者的線程數的影響。

而對于隻有一個生産者線程,一個消費者線程,用if判斷是沒有問題的,但是仍然強烈建議改成while語句進行判斷。

1.2.2正确的姿勢

package demo;

import java.util.Random;

/*使用線程對象,一個緩存位置,一個生産者,一個消費者,無限生産商品消費商品*/

public class ProducterComsumerDemo1 {
	
	/*定義一個商品緩存位置*/
	private volatile Goods goods;
	
	/*定義一個對象作為鎖,不使用goods作為鎖是因為生産者每次會産生一個新的對象*/
	private Object obj = new Object();
	
	
	/*isFull == true 生産者線程休息,消費者線程消費 
	 *isFull == false 消費者線程消費,生産者線程生産*/
	private volatile boolean isFull = false;
	
	/*商品的id編号,生産者制造的每個商品的id都不一樣,每生産一個id自增1*/
	private int id = 1;
	
	/*随機産生一個sleep時間*/
	private Random rnd = new Random();
	
	
	/*=================定義消費者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){
			
			try{
				while(true){
					
					/*擷取obj對象的鎖, id 和 isFull 的操作都在同步代碼塊中*/
					synchronized(obj){
						
						while(!isFull){
							/*wait方法使目前線程阻塞,并釋放鎖*/
							obj.wait();
						}
							
						/*随機延時一段時間*/
						Thread.sleep(rnd.nextInt(250));
						
						/*模拟消費商品*/
						System.out.println(goods);
						
						/*随機延時一段時間*/
						Thread.sleep(rnd.nextInt(250));
						
						isFull = false;
						
						/*喚醒阻塞obj上的生産者線程*/
						obj.notifyAll();
						
					}
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(250));
					
				}
			}catch (InterruptedException e){
				/*我就是任性,這裡什麼都不做*/
			}
		}
	}
	
	
	/*=================定義生産者線程==================*/
	public class ProductThread implements Runnable{
		@Override
		public void run(){
			
			try {
				while(true){
					
					synchronized(obj){
						
						while(isFull){
							obj.wait();
						}
							
						Thread.sleep(rnd.nextInt(500));
						
						/*如果id為偶數,生産價格為2的産品A
						     如果id為奇數,生産價格為1的産品B*/
						if(id % 2 == 0){
							goods = new Goods("A", 2, id);
						}else{
							goods = new Goods("B", 1, id);
						}
						
						Thread.sleep(rnd.nextInt(250));
						
						id++;
						isFull = true;
						
						/*喚醒阻塞的消費者線程*/
						obj.notifyAll();
					}
				}
			} catch (InterruptedException e) {
				/*我就是任性,這裡什麼都不做*/
			}
			
		}
	}
	
	
	public static void main(String[] args) throws InterruptedException{
		
		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
		
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		
		
		new Thread(p).start();
		new Thread(p).start();
		new Thread(p).start();
		
		new Thread(c).start();
		new Thread(c).start();
		new Thread(c).start();
		
	}
}
      

1.3 使用線程對象,多個緩存位置(有界),多生産者,多消費者

1)當緩存位置滿時,我們應該阻塞生産者線程

2)當緩存位置空時,我們應該阻塞消費者線程

下面的代碼我沒有用java對象内置的鎖,而是用了ReentrantLock對象。是因為普通對象的鎖隻有一個阻塞隊列,如果使用notify方式,無法保證喚醒的就是特定類型的線程(消費者線程或生産者線程),而notifyAll方法會喚醒所有的線程,當剩餘的緩存商品的數量小于生産者線程數量或已緩存商品的數量小于消費者線程時效率就比較低。是以這裡我們通過ReentrantLock對象構造兩個阻塞隊列提高效率。

1.3.1 普通方式

package demo;

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/*使用線程對象,多個緩存位置(有界),多生産者,多消費者,無限循環模式*/

public class ProducterComsumerDemo2 {
	
	/*最大緩存商品數*/
	private final int MAX_SLOT = 2;
	
	/*定義緩存商品的容器*/
	private LinkedList<Goods> queue = new LinkedList<Goods>();
	
	/*定義線程鎖和鎖對應的阻塞隊列*/
	private Lock lock = new ReentrantLock();
	private Condition full = lock.newCondition();
	private Condition empty = lock.newCondition();
	
	/*商品的id編号,生産者制造的每個商品的id都不一樣,每生産一個id自增1*/
	private int id = 1;
	
	/*随機産生一個sleep時間*/
	private Random rnd = new Random();
	

	/*=================定義消費者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){

			while(true){
				
				/*加鎖,queue的出列操作都在同步代碼塊中*/
				lock.lock();
				try {
					while(queue.isEmpty()){
						System.out.println("queue is empty");
						empty.await();			
					}
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(200));
					
					/*模拟消費商品*/
					Goods goods = queue.remove();
					System.out.println(goods);
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(200));
					
					/*喚醒阻塞的生産者線程*/
					full.signal();
					
				} catch (InterruptedException e) {
					/*什麼都不做*/
				}finally{
					lock.unlock();
				}
				
				/*釋放鎖後随機延時一段時間*/
				try {
					Thread.sleep(rnd.nextInt(200));
				} catch (InterruptedException e) {
					/*什麼都不做*/
				}
			}
		}
	}
	
	/*=================定義生産者線程==================*/
	public class ProductThread implements Runnable{
		
		@Override
		public void run(){
				
			while(true){
				/*加鎖,queue的入列操作,id操作都在同步代碼塊中*/
				lock.lock();
				try{
					
					while(queue.size() == MAX_SLOT){
						System.out.println("queue is full");
						full.await();
					}
					
					Thread.sleep(rnd.nextInt(200));
					
					Goods goods = null;
					/*根據序号産生不同的商品*/
					switch(id%3){
						case 0 : goods = new Goods("A", 1, id);
								 break;
						
						case 1 : goods = new Goods("B", 2, id);
								 break;
								 
						case 2 : goods = new Goods("C", 3, id);
								 break;
					}
					
					Thread.sleep(rnd.nextInt(200));
					
					queue.add(goods);
					id++;

					/*喚醒阻塞的消費者線程*/
					empty.signal();
					
				}catch(InterruptedException e){
					/*什麼都不做*/
				}finally{
					lock.unlock();
				}
				
				/*釋放鎖後随機延時一段時間*/
				try {
					Thread.sleep(rnd.nextInt(100));
				} catch (InterruptedException e) {
					/*什麼都不做*/
				}
			}
		}
	}
	
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		
		ProducterComsumerDemo2 pcd = new ProducterComsumerDemo2();
		
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		
		/*兩個生産者線程,兩個消費者線程*/
		new Thread(p).start();
		new Thread(p).start();
		
		new Thread(c).start();
		new Thread(c).start();
		
	}
}
      
queue is empty
queue is empty
name: B,   price:2,   id: 1
name: C,   price:3,   id: 2
name: A,   price:1,   id: 3
queue is full
name: B,   price:2,   id: 4
name: C,   price:3,   id: 5
queue is full
name: A,   price:1,   id: 6
name: B,   price:2,   id: 7
name: C,   price:3,   id: 8
name: A,   price:1,   id: 9
name: B,   price:2,   id: 10
name: C,   price:3,   id: 11
name: A,   price:1,   id: 12
name: B,   price:2,   id: 13
name: C,   price:3,   id: 14
……
      

1.3.2 更優雅的實作方式

下面使用線程池(ThreadPool)和阻塞隊列(LinkedBlockingQueue)原子類(AtomicInteger)以更加優雅的方式實作上述功能。LinkedBlockingQueue阻塞隊列僅在take和put方法上鎖,是以id必須定義為原子類。

package demo;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/*使用線程對象,多個緩存位置(有界),多生産者,多消費者,無限循環模式*/

public class ProducterComsumerDemo4 {
	
	/*最大緩存商品數*/
	private final int MAX_SLOT = 3;
	
	/*定義緩存商品的容器*/
	private LinkedBlockingQueue<Goods> queue = new LinkedBlockingQueue<Goods>(MAX_SLOT);
	
	/*商品的id編号,生産者制造的每個商品的id都不一樣,每生産一個id自增1*/
	private AtomicInteger id = new AtomicInteger(1);
	
	/*随機産生一個sleep時間*/
	private Random rnd = new Random();
	
	/*=================定義消費者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){

			while(true){
				
				try {
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(200));
					
					/*模拟消費商品*/
					Goods goods = queue.take();
					System.out.println(goods);
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(200));
					
				} catch (InterruptedException e) {
					/*什麼都不做*/
				}
			}
		}
	}
	
	/*=================定義生産者線程==================*/
	public class ProductThread implements Runnable{
		@Override
		public void run(){
				
			while(true){
				
				try{
					
					int x = id.getAndIncrement();
					Goods goods = null;
					
					Thread.sleep(rnd.nextInt(200));
					
					/*根據序号産生不同的商品*/
					switch(x%3){
						case 0 : goods = new Goods("A", 1, x);
								 break;
						
						case 1 : goods = new Goods("B", 2, x);
								 break;
								 
						case 2 : goods = new Goods("C", 3, x);
								 break;
					}
					
					Thread.sleep(rnd.nextInt(200));
					
					queue.put(goods);

					Thread.sleep(rnd.nextInt(100));
					
				}catch(InterruptedException e){
					/*什麼都不做*/
				}
			}
		}
	}
	
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		
		ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4();
		
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		
		/*定義線程池*/
		ExecutorService es = Executors.newCachedThreadPool();
		
		/*三個生産者線程,兩個消費者線程*/
		es.execute(p);
		es.execute(p);
		es.execute(p);
		
		es.execute(c);
		es.execute(c);
		
		es.shutdown();
	}
}

      

2. 有限商品個數

這個問題顯然比上面的問題要複雜不少,原因在于要保證緩存區的商品要全部消費掉,沒有重複消費商品,沒有覆寫商品,同時還要保證所有線程能夠正常結束,防止存在一直阻塞的線程。

2.1 使用線程對象,多個緩存位置(有界),多生産者,多消費者

思路 定義一下三個變量

/*需要生産的總商品數*/
	private final int TOTAL_NUM = 30;
	
	/*已産生的數量*/
	private volatile int productNum = 0;
	
	/*已消耗的商品數*/
	private volatile int comsumedNum = 0;      

每生産一個商品 productNum 自增1,直到TOTAL_NUM為止,如果不滿足條件 productNum < TOTAL_NUM 則結束程序,自增操作必須在full.await()方法調用之前,防止生産者線程無法喚醒。

同理,每消費一個商品 comsumedNum 自增1,直到TOTAL_NUM為止,如果不滿足條件 comsumedNum < TOTAL_NUM 則結束程序,自增操作必須在empty.await()方法調用之前,防止消費者線程無法喚醒。

comsumedNum和productNum相當于計劃經濟時代的糧票一樣,有了它能夠保證生産者線程在喚醒後一定需要生産一個商品,消費者線程在喚醒以後一定能夠消費一個商品

package demo;

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/*使用線程對象,多個緩存位置(有界),多生産者,多消費者, 有限商品個數*/

public class ProducterComsumerDemo3 {
	
	/*需要生産的總商品數*/
	private final int TOTAL_NUM = 30;
	
	/*已産生的數量*/
	private volatile int productNum = 0;
	
	/*已消耗的商品數*/
	private volatile int comsumedNum = 0;
	
	/*最大緩存商品數*/
	private final int MAX_SLOT = 2;
	
	/*定義線程公用的鎖和條件*/
	private Lock lock = new ReentrantLock();
	private Condition full = lock.newCondition();
	private Condition empty = lock.newCondition();
	
	
	/*定義緩存商品的容器*/
	private LinkedList<Goods> queue = new LinkedList<Goods>();
	
	/*商品的id編号,生産者制造的每個商品的id都不一樣,每生産一個id自增1*/
	private int id = 1;
	
	/*随機産生一個sleep時間*/
	private Random rnd = new Random();
	
	
	/*=================定義消費者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){
				
			while(true){
				/*加鎖, id、comsumedNum 操作都在同步代碼塊中*/
				lock.lock();
				try {
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(250));
					
					if(comsumedNum < TOTAL_NUM){
						comsumedNum++;
					}else{
						/*這裡會自動執行finally的語句,釋放鎖*/
						break;
					}
					
					while(queue.isEmpty()){
						System.out.println("queue is empty");
						empty.await();
					}
						
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(250));
					
					/*模拟消費商品*/
					Goods goods = queue.remove();
					System.out.println(goods);
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(250));
					
					/*喚醒阻塞的生産者線程*/
					full.signal();
					
				} catch (InterruptedException e) {

				}finally{
					lock.unlock();
				}
				
				/*釋放鎖後,随機延時一段時間*/
				try {
					Thread.sleep(rnd.nextInt(250));
				} catch (InterruptedException e) {

				}
			}
			
			System.out.println(
					"customer " 
					+ Thread.currentThread().getName()
					+ " is over");
		}
	}
	
	
	/*=================定義生産者線程==================*/
	public class ProductThread implements Runnable{
		
		@Override
		public void run(){
			
			while(true){
				
				lock.lock();
				try{
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(250));
					
					if(productNum < TOTAL_NUM){
						productNum++;
					}else{
						/*這裡會自動執行finally的語句,釋放鎖*/
						break;
					}
					
					Thread.sleep(rnd.nextInt(250));
					
					while(queue.size() == MAX_SLOT){
						System.out.println("queue is full");
						full.await();
					}
						
					Thread.sleep(rnd.nextInt(250));
					
					Goods goods = null;
					/*根據序号産生不同的商品*/
					switch(id%3){
						case 0 : goods = new Goods("A", 1, id);
								 break;
						
						case 1 : goods = new Goods("B", 2, id);
								 break;
								 
						case 2 : goods = new Goods("C", 3, id);
								 break;
					}
					
					queue.add(goods);
					id++;
					
					/*喚醒阻塞的消費者線程*/
					empty.signal();
					
				}catch(InterruptedException e){

				}finally{
					lock.unlock();
				}
				
				/*釋放鎖後,随機延時一段時間*/
				try {
					Thread.sleep(rnd.nextInt(250));
				} catch (InterruptedException e) {
					/*什麼都不做*/
				}
			}
			
			System.out.println(
					"producter " 
					+ Thread.currentThread().getName()
					+ " is over");
		}
	}
	
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		
		ProducterComsumerDemo3 pcd = new ProducterComsumerDemo3();
		
		ComsumeThread c = pcd.new ComsumeThread();
		ProductThread p = pcd.new ProductThread();
		
		new Thread(p).start();
		new Thread(p).start();
		new Thread(p).start();
		
		new Thread(c).start();
		new Thread(c).start();
		new Thread(c).start();
		
		System.out.println("main Thread is over");
	}
}
      

2.2利用線程池,原子類,阻塞隊列,以更優雅的方式實作

LinkedBlockingQueue阻塞隊列僅在take和put方法上鎖,是以productNum和comsumedNum必須定義為原子類。

package demo;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/*使用線程池,多個緩存位置(有界),多生産者,多消費者, 有限商品個數*/
public class LinkedBlockingQueueDemo {
	
	/*需要生産的總商品數*/
	private final int TOTAL_NUM = 20;
	
	/*已産生商品的數量*/
	volatile AtomicInteger productNum = new AtomicInteger(0);
	
	/*已消耗的商品數*/
	volatile AtomicInteger comsumedNum = new AtomicInteger(0);
	
	/*最大緩存商品數*/
	private final int MAX_SLOT = 5;
	
	/*同步阻塞隊列,隊列容量為MAX_SLOT*/
	private LinkedBlockingQueue<Goods> lbq = new LinkedBlockingQueue<Goods>(MAX_SLOT);
	
	/*随機數*/
	private Random rnd = new Random();
	
	/*pn表示産品的編号,産品編号從1開始*/
	private volatile AtomicInteger pn = new AtomicInteger(1);
	
	
	/*=================定義消費者線程==================*/
	public class CustomerThread implements Runnable{
		
		@Override
		public void run(){
			
			while(comsumedNum.getAndIncrement() < TOTAL_NUM){
				
				try{
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(500));
					
					/*從隊列中取出商品,隊列空時發生阻塞*/
					Goods goods = lbq.take();
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(500));
					
					/*模拟消耗商品*/
					System.out.println(goods);
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(500));
					
				}catch(InterruptedException e){

				}	
			}
			
			System.out.println(
					"customer " 
					+ Thread.currentThread().getName()
					+ " is over");
		}
	}
	
	/*=================定義生産者線程==================*/
	public class ProducerThread implements Runnable{
		
		@Override
		public void run(){
			
			while(productNum.getAndIncrement() < TOTAL_NUM){
				try {
					int x = pn.getAndIncrement();
					
					Goods goods = null;
					/*根據序号産生不同的商品*/
					switch(x%3){
						case 0 : goods = new Goods("A", 1, x);
								 break;
						
						case 1 : goods = new Goods("B", 2, x);
								 break;
								 
						case 2 : goods = new Goods("C", 3, x);
								 break;
					}
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(500));

					/*産生的新産品入列,隊列滿時發生阻塞*/
					lbq.put(goods);
					
					/*随機延時一段時間*/
					Thread.sleep(rnd.nextInt(500));
				
				} catch (InterruptedException e1) {
					/*什麼都不做*/
				}
			}
			
			System.out.println(
					"producter " 
					+ Thread.currentThread().getName()
					+ " is over ");
		}
	}
	
	/*=================main==================*/
	public static void main(String[] args){
		
		LinkedBlockingQueueDemo lbqd = new LinkedBlockingQueueDemo();
		
		Runnable c = lbqd.new CustomerThread();
		Runnable p = lbqd.new ProducerThread();
		
		ExecutorService es = Executors.newCachedThreadPool();

		es.execute(c);
		es.execute(c);
		es.execute(c);
		
		es.execute(p);
		es.execute(p);
		es.execute(p);
		
		es.shutdown();
		System.out.println("main Thread is over");
	}
}