天天看点

java多线程共享queue_Java多线程之传统线程机制

传统线程机制的回顾

创建线程的两种方式

1 在Thread子类覆盖的run方法中编写运行代码

Thread t1 = new Thread()

{

@Override

public void run() {

System.out.println("run in thread run()");

}

}

2 在传递给Thread对象的Runnable对象的run方法中编写代码

Thread t2 = new Thread(new Runnable() {

@Override

public void run() {

System.out.println("run in runnable");

}

})

查看Thread的run方法源码:

@Override

public void run() {

if (target != null) {

target.run();

}

}

这个target就是构造函数中传入的runnable对象,因此如下代码的输出应该是?

new Thread(new Runnable() {

@Override

public void run() {

System.out.println("run in runnable");

}

}) {

@Override

public void run() {

System.out.println("run in thread run()");

}

}.start();

答案应该是:run in thread run(),因为重写了run方法,也就不会执行target的run方法了。

两种创建线程的方法没有本质区别,但第二种方式更符合面向对象的思想。

线程状态转换

java多线程共享queue_Java多线程之传统线程机制

新建状态(New):新创建了一个线程对象。

就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于可运行线程池中,变得可运行,等待获取CPU的使用权。

运行状态(Running):就绪状态的线程获取了CPU,执行程序代码。

阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:

等待阻塞:运行的线程执行wait()方法,JVM会把该线程放入等待池中。(wait会释放持有的锁)

同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中。

其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。(注意,sleep是不会释放持有的锁)

死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。

线程的同步互斥通信

同步与互斥

synchronized关键字:隐式锁

可作用于方法和代码块:

作用于方法,锁对象为this,当前实例;

作用于static方法,锁对象为当前类的Class对象;

作用于代码块,需要指定锁对象。

synchronized的作用:同一个锁对象的代码(方法和代码块),同一时间只能被一个线程执行,保证可见性和原子性。

线程通信wait和notify

使用synchronized的地方不一定使用wait和notify,但使用notify和wait有两个必要条件:

(1)wait和notify只有在synchronized关键字作用范围内才有效,否则无意义;

(2)wait和notify的调用对象必须是锁对象,也就是synchronized当前使用的对象,否则无效,有可能跑出异常:illegalmonitorstateexception

以下为两个例子,

例子1:使用两个线程打印,子线程每次打印10个数字,主线程每次打印100个数字,如此往复50次:

public class TraditionalThreadCommunication {

public static void main(String[] args) {

final Business business = new Business();

new Thread(

new Runnable() {

@Override

public void run() {

for(int i=1;i<=50;i++){

business.sub(i);

}

}

}

).start();

for(int i=1;i<=50;i++){

business.main(i);

}

}

}

class Business {

private boolean bShouldSub = true;

public synchronized void sub(int i){

while(!bShouldSub){

try {

this.wait();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

for(int j=1;j<=10;j++){

System.out.println("sub thread sequence of " + j + ",loop of " + i);

}

bShouldSub = false;

this.notify();

}

public synchronized void main(int i){

while(bShouldSub){

try {

this.wait();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

for(int j=1;j<=100;j++){

System.out.println("main thread sequence of " + j + ",loop of " + i);

}

bShouldSub = true;

this.notify();

}

}

例子2 使用wait和notify实现生产者消费者阻塞队列,队列长度为5:

public static void main(String[] args) {

List taskQueue = new ArrayList();

int MAX_CAPACITY = 5;

Thread tProducer = new Thread(new Producer(taskQueue, MAX_CAPACITY), "Producer");

Thread tConsumer = new Thread(new Consumer(taskQueue), "Consumer");

tProducer.start();

tConsumer.start();

}

//生产者

public class Producer implements Runnable {

private final List taskQueue;

private final int MAX_CAPACITY;

public Producer(List sharedQueue, int size) {

this.taskQueue = sharedQueue;

this.MAX_CAPACITY = size;

}

@Override

public void run() {

int counter = 0;

while (true) {

try {

produce(counter++);

} catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

private void produce(int i) throws InterruptedException {

synchronized (taskQueue) {

while (taskQueue.size() == MAX_CAPACITY) {

System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + taskQueue.size());

taskQueue.wait();

}

Thread.sleep(1000);

taskQueue.add(i);

System.out.println("Produced: " + i);

taskQueue.notifyAll();

}

}

//消费者

public class Consumer implements Runnable {

private final List taskQueue;

public Consumer(List sharedQueue) {

this.taskQueue = sharedQueue;

}

@Override

public void run() {

while (true) {

try {

consume();

} catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

private void consume() throws InterruptedException {

synchronized (taskQueue) {

while (taskQueue.isEmpty()) {

System.out.println("Queue is empty " + Thread.currentThread().getName() + " is waiting , size: " + taskQueue.size());

taskQueue.wait();

}

Thread.sleep(500);

int i = (Integer) taskQueue.remove(0);

System.out.println("Consumed: " + i);

taskQueue.notify();

}

}

例子1,两个线程会有规律的交叉打印,而例子2则在不同机器上有不同表现,依赖于cpu的调度,最关键的地方在于:例子1中wait的进入条件会随着每一轮的执行而改变一次,而例子2中,只有队列为空或满时才会进入wait,因此没有明显规律。

ThreadLocal实现线程范围的共享变量

ThreadLocal的作用和目的:用于实现线程内的数据共享,即对于相同的程序代码,多个模块在同一个线程中运行时要共享一份数据,而在另外线程中运行时又共享另外一份数据。

(1) 每个线程调用全局ThreadLocal对象的set方法,就相当于往其内部的map中增加一条记录,key分别是各自的线程,value是各自的set方法传进去的值。在线程结束时可以调用ThreadLocal.clear()方法,这样会更快释放内存,不调用也可以,因为线程结束后也可以自动释放相关的ThreadLocal变量.

(2) 一个ThreadLocal只能存储一个对象,如果需要存储多个对象,则可以用多个ThreadLocal对象,如果很多,则把这些对象封装到一个实体类中。

(3) ThreadLocal的应用场景:如android中的Looper就存储在ThreadLocal变量中,一个线程只能有一个looper。

Looper.prepare()方法

private static void prepare(boolean quitAllowed) {

if (sThreadLocal.get() != null) {

throw new RuntimeException("Only one Looper may be created per thread");

}

sThreadLocal.set(new Looper(quitAllowed));

}

(4) 测试代码

public class ThreadLocalTest {

private static ThreadLocal x = new ThreadLocal();

private static ThreadLocal myThreadScopeData = new ThreadLocal();

public static void main(String[] args) {

for(int i=0;i<2;i++){

new Thread(new Runnable(){

@Override

public void run() {

int data = new Random().nextInt();

System.out.println(Thread.currentThread().getName()

+ " has put data :" + data);

x.set(data);

MyThreadScopeData.getThreadInstance().setName("name" + data);

MyThreadScopeData.getThreadInstance().setAge(data);

new A().get();

new B().get();

}

}).start();

}

}

static class A{

public void get(){

int data = x.get();

System.out.println("A from " + Thread.currentThread().getName()

+ " get data :" + data);

MyThreadScopeData myData = MyThreadScopeData.getThreadInstance();

System.out.println("A from " + Thread.currentThread().getName()

+ " getMyData: " + myData.getName() + "," +

myData.getAge());

}

}

static class B{

public void get(){

int data = x.get();

System.out.println("B from " + Thread.currentThread().getName()

+ " get data :" + data);

MyThreadScopeData myData = MyThreadScopeData.getThreadInstance();

System.out.println("B from " + Thread.currentThread().getName()

+ " getMyData: " + myData.getName() + "," +

myData.getAge());

}

} }

class MyThreadScopeData{

private MyThreadScopeData(){}

public static MyThreadScopeData getThreadInstance(){

MyThreadScopeData instance = map.get();

if(instance == null){

instance = new MyThreadScopeData();

map.set(instance);

}

return instance;

}

//private static MyThreadScopeData instance = null;//new MyThreadScopeData();

private static ThreadLocal map = new ThreadLocal();

private String name;

private int age;

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getAge() {

return age;

}

public void setAge(int age) {

this.age = age;

}

}