天天看點

Java并發包源碼分析

并發是一種能并行運作多個程式或并行運作一個程式中多個部分的能力。如果程式中一個耗時的任務能以異步或并行的方式運作,那麼整個程式的吞吐量和可互動性将大大改善。現代的PC都有多個CPU或一個CPU中有多個核,是否能合理運用多核的能力将成為一個大規模應用程式的關鍵。

  Java基礎部分知識總結點選Java并發基礎總結。Java多線程相關類的實作都在Java的并發包concurrent,concurrent包主要包含3部分内容,第一個是atomic包,裡面主要是一些原子類,比如AtomicInteger、AtomicIntegerArray等;第二個是locks包,裡面主要是鎖相關的類,比如ReentrantLock、Condition等;第三個就是屬于concurrent包的内容,主要包括線程池相關類(Executors)、阻塞集合類(BlockingQueue)、并發Map類(ConcurrentHashMap)、線程相關類(Thread、Runnable、Callable)等。

atomic包源碼分析

  atomic包是專門為線程安全設計的Java包,包含多個原子操作類。其基本思想就是在多線程環境下,當有多個線程同時執行這些類的執行個體的方法時,具有排他性,一個線程進入方法執行指令時,不會被其他的線程打斷,而别的線程就像自旋鎖一樣,一直等待該方法執行完成。

  原子變量的底層使用了處理器提供的原子指令,但是不同的CPU架構可能提供的原子指令不一樣,也有可能需要某種形式的内部鎖,是以該方法不能絕對保證線程不被阻塞。

  atomic包一共有12個類,四種原子更新方式,分别是原子更新基本類型、原子更新數組、原子更新引用和原子更新字段。JDK1.5中引入了底層的支援,在int、long和對象的引用等類型上都公開了CAS的操作,并且JVM把它們編譯為底層硬體提供的最有效的方法,在運作CAS的平台上,運作時把它們編譯為相應的機器指令。在java.util.concurrent.atomic包下面的所有的原子變量類型中,比如AtomicInteger,都使用了這些底層的JVM支援為數字類型的引用類型提供一種高效的CAS操作。

  Unsafe中的操作一般都是基于CAS來實作的,CAS就是Compare and Swap的意思,比較并操作。很多的cpu直接支援CAS指令。CAS是一項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,隻有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程并不會被挂起,而是被告知這次競争中失敗,并可以再次嘗試。CAS有3個操作數,記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,将記憶體值V修改為B,否則什麼都不做。

複制代碼

/**

AtomicMain

atomic class test

*/

public class AtomicMain {

public static void main(String[] args) throws InterruptedException {

ExecutorService executor = Executors.newCachedThreadPool();

AtomicInteger data = new AtomicInteger(0);

AtomicIntegerArray array = new AtomicIntegerArray(10);

AtomicReference reference = new AtomicReference();

}

AtomicInteger

static class AtomicIntegerTask implements Runnable {

private AtomicInteger data;

public AtomicIntegerTask(AtomicInteger data) {

this.data = data;

public void run() {

int cnt = 10;

傳進來的Array大小至少為10

AtomicIntegerArray是原子性的,保證對該array整個記憶體操作的原子性,

也就是說不可能同時有A線程對array[0]操作,而B線程對array[1]操作

static class AtomicIntegerArrayTask implements Runnable {

private AtomicIntegerArray array;

public AtomicIntegerArrayTask(AtomicIntegerArray array) {

this.array = array;

static class AtomicReferenceTask implements Runnable {

private AtomicReference reference;

static class User {

public String name;

public int age;

AtomicInteger. incrementAndGet流程

原子自增1

this表示AtomicInteger執行個體

valueOffset表示value資料域相對于this的記憶體位址的偏移位置

/

public final int incrementAndGet() {

return unsafe.getAndAddInt(this, valueOffset, 1) + 1;

public final int getAndAddInt(Object var1, long var2, int var4) {

int var5;

do {

/ 擷取value在記憶體中的值,然後進行CAS操作 */

var5 = this.getIntVolatile(var1, var2);

} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;

lock包源碼分析

  lock包裡面主要是鎖相關的類,比如ReentrantLock、Condition等。

  Lock接口主要有lock、lockInterruptibly、tryLock、unlock、newCondition等方法:

public interface Lock {

使用Lock示例:

public class LockMain {

Lock lock = new ReentrantLock();

AtomicInteger data = new AtomicInteger(0);

concurrent包源碼分析

BlockingQueue

public interface BlockingQueue<E> extends Queue<E> {

ArrayBlockingQueue

  ArrayBlockingQueue是一個基于數組的有界阻塞隊列,按照FIFO(先進先出)原則對元素進行排序,在構造方法中會new一個數組,并且new ReentrantLock,并且初始化notEmpty和notFull兩個Condition。

public ArrayBlockingQueue(int capacity, boolean fair) {

if (capacity <= 0)

throw new IllegalArgumentException();

this.items = new Object[capacity];

lock = new ReentrantLock(fair);

notEmpty = lock.newCondition();

notFull = lock.newCondition();

  執行put操作時,首先擷取lock,如果數組已經滿了,則調用notFull.await等待;否則調用enqueue插入元素,插入成功後把count計數值加1,調用notEmpty.signal。判斷數組是否滿了是根據count是否等于數組長度來确定的,因為往數組中插入元素時,首先從下标為0位置開始插入,插到下标為array.length-1時,如果count小于array.length,則下一次從下标為0位置插入。

public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == items.length)

notFull.await();

enqueue(e);

} finally {

lock.unlock();

private void enqueue(E x) {

final Object[] items = this.items;

items[putIndex] = x;

if (++putIndex == items.length)

putIndex = 0;

count++;

notEmpty.signal();

  執行take操作時,首先擷取lock,如果數組為空,則調用notEmpty.await等待;否則調用dequeue取出元素,取出成功後把count計數值減1,調用notFull.signal。

public E take() throws InterruptedException {

while (count == 0)

notEmpty.await();

return dequeue();

private E dequeue() {

final Object[] items = this.items;br/>@SuppressWarnings("unchecked")

E x = (E) items[takeIndex];

items[takeIndex] = null;

if (++takeIndex == items.length)

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued();

notFull.signal();

return x;

  lock的wait/signal更多知識:http://www.cnblogs.com/alphablox/archive/2013/01/20/2868479.html

LinkedBlockingQueue

  LinkedBlockingQueue是基于連結清單結構的阻塞隊列,按照FIFO(先進先出)原則對元組進行排序,新元素是尾部插入,吞吐量通常高于ArrayBlockingQueue。該類中包含一個takeLock和基于takeLock的Condition對象notEmpty,一個putLock鎖,和基于putLock的Condition對象notFull。在構造方法中會新new一個Node,last和head都指向該Node節點。

public LinkedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new Node<E>(null);

  執行put操作時,首先擷取putLock,如果連結清單節點數已經達到上限,則調用notFull.await等待;否則調用enqueue插入元素,插入成功後把count值原子加1,如果連結清單節點數未達到上限,則調用notFull.signal。然後擷取takeLock,再調用notEmpty.signal通知。

if (e == null) throw new NullPointerException();

int c = -1;

Node<E> node = new Node<E>(e);

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

putLock.lockInterruptibly();

while (count.get() == capacity) {

enqueue(node);

c = count.getAndIncrement();

if (c + 1 < capacity)

putLock.unlock();

if (c == 0)

signalNotEmpty();

private void enqueue(Node<E> node) {

last = last.next = node;

private void signalNotEmpty() {

final ReentrantLock takeLock = this.takeLock;

takeLock.lock();

takeLock.unlock();

  執行take操作時,首先擷取takeLock,如果連結清單為空,則調用notEmpty.await等待;否則調用dequeue取出元素,然後把count值原子減1,如果此時連結清單非空,則調用notEmpty.signal。然後擷取putLock,再調用putLock.signal通知。

E x;

takeLock.lockInterruptibly();

while (count.get() == 0) {

x = dequeue();

c = count.getAndDecrement();

if (c > 1)

if (c == capacity)

signalNotFull();

Node<E> h = head;

Node<E> first = h.next;

h.next = h; // help GC

head = first;

E x = first.item;

first.item = null;

private void signalNotFull() {

putLock.lock();

ConcurrentHashMap

使用ConcurrentHashMap程式示例

HashMapMain test

public class HashMapMain {

ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<String, String>();

ExecutorService executorService = Executors.newCachedThreadPool();

static class HashMapPutTask implements Runnable {

private ConcurrentHashMap<String, String> hashMap;

幾個核心的内部類:

Node

  Node是最核心的内部類,它包裝了key-value鍵值對,所有插入ConcurrentHashMap的資料都包裝在這裡面。它與HashMap中的定義很相似,但是但是有一些差别它對value和next屬性設定了volatile同步鎖,它不允許調用setValue方法直接改變Node的value域,它增加了find方法輔助map.get()方法。

static class Node<K,V> implements Map.Entry<K,V> {

final int hash;

final K key;

volatile V val;

volatile Node<K,V> next;

// ...

TreeNode

  樹節點類,另外一個核心的資料結構。當連結清單長度過長的時候,會轉換為TreeNode。但是與HashMap不相同的是,它并不是直接轉換為紅黑樹,而是把這些結點包裝成TreeNode放在TreeBin對象中,由TreeBin完成對紅黑樹的操作。而且TreeNode在ConcurrentHashMap繼承自Node類,而并非HashMap中的繼承自LinkedHashMap.Entry<K,V>類,也就是說TreeNode帶有next指針,這樣做的目的是友善基于TreeBin的通路。

TreeBin

put操作

  ConcurrentHashMap最常用的就是put和get兩個方法。現在來介紹put方法,這個put方法依然沿用HashMap的put方法的思想,根據hash值計算這個新插入的點在table中的位置i,如果i位置是空的,直接放進去,否則進行判斷,如果i位置是樹節點,按照樹的方式插入新的節點,否則把i插入到連結清單的末尾。ConcurrentHashMap中依然沿用這個思想,有一個最重要的不同點就是ConcurrentHashMap不允許key或value為null值。另外由于涉及到多線程,put方法就要複雜一點。在多線程中可能有以下兩個情況

如果一個或多個線程正在對ConcurrentHashMap進行擴容操作,目前線程也要進入擴容的操作中。這個擴容的操作之是以能被檢測到,是因為transfer方法中在空結點上插入forward節點,如果檢測到需要插入的位置被forward節點占有,就幫助進行擴容;

如果檢測到要插入的節點是非空且不是forward節點,就對這個節點加鎖,這樣就保證了線程安全。盡管這個有一些影響效率,但是還是會比hashTable的synchronized要好得多。

  整體流程就是首先定義不允許key或value為null的情況放入 對于每一個放入的值,首先利用spread方法對key的hashcode進行一次hash計算,由此來确定這個值在table中的位置。如果這個位置是空的,那麼直接放入,而且不需要加鎖操作。

  如果這個位置存在結點,說明發生了hash碰撞,首先進入sychnorized同步代碼塊,然後判斷這個節點的類型。如果是連結清單節點(fh>0),則得到的結點就是hash值相同的節點組成的連結清單的頭節點。需要依次向後周遊确定這個新加入的值所在位置。如果遇到hash值與key值都與新加入節點是一緻的情況,則隻需要更新value值即可。否則依次向後周遊,直到連結清單尾插入這個結點。 如果加入這個節點以後連結清單長度大于8,就把這個連結清單轉換成紅黑樹。如果這個節點的類型已經是樹節點的話,直接調用樹節點的插入方法進行插入新的值。

public V put(K key, V value) {

return putVal(key, value, false);

/* Implementation for put and putIfAbsent /

final V putVal(K key, V value, boolean onlyIfAbsent) {

//不允許 key或value為null

if (key == null || value == null) throw new NullPointerException();

//計算hash值

int hash = spread(key.hashCode());

int binCount = 0;

//死循環 何時插入成功 何時跳出

for (Node<K,V>[] tab = table;;) {

Node<K,V> f; int n, i, fh;

//如果table為空的話,初始化table

if (tab == null || (n = tab.length) == 0)

tab = initTable();

//根據hash值計算出在table裡面的位置

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

//如果這個位置沒有值 ,直接放進去,不需要加鎖

if (casTabAt(tab, i, null,

new Node<K,V>(hash, key, value, null)))

break; // no lock when adding to empty bin

//當遇到表連接配接點時,需要進行整合表的操作

else if ((fh = f.hash) == MOVED)

tab = helpTransfer(tab, f);

else {

V oldVal = null;

//結點上鎖 這裡的結點可以了解為hash值相同組成的連結清單的頭結點

synchronized (f) {

if (tabAt(tab, i) == f) {

//fh〉0 說明這個節點是一個連結清單的節點 不是樹的節點

if (fh >= 0) {

binCount = 1;

//在這裡周遊連結清單所有的結點

for (Node<K,V> e = f;; ++binCount) {

K ek;

//如果hash值和key值相同 則修改對應結點的value值

if (e.hash == hash &&

((ek = e.key) == key ||

(ek != null && key.equals(ek)))) {

oldVal = e.val;

if (!onlyIfAbsent)

e.val = value;

break;

Node<K,V> pred = e;

//如果周遊到了最後一個結點,那麼就證明新的節點需要插入 就把它插入在連結清單尾部

if ((e = e.next) == null) {

pred.next = new Node<K,V>(hash, key,

value, null);

//如果這個節點是樹節點,就按照樹的方式插入值

else if (f instanceof TreeBin) {

Node<K,V> p;

binCount = 2;

if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,

value)) != null) {

oldVal = p.val;

p.val = value;

if (binCount != 0) {

//如果連結清單長度已經達到臨界值8 就需要把連結清單轉換為樹結構

if (binCount >= TREEIFY_THRESHOLD)

treeifyBin(tab, i);

if (oldVal != null)

return oldVal;

//将目前ConcurrentHashMap的元素數量+1

addCount(1L, binCount);

return null;

get方法

  get方法比較簡單,給定一個key來确定value的時候,必須滿足兩個條件 key相同 hash值相同,對于節點可能在連結清單或樹上的情況,需要分别去查找。

public V get(Object key) {

Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;

int h = spread(key.hashCode());

//根據hash值确定節點位置

if ((tab = table) != null && (n = tab.length) > 0 &&

(e = tabAt(tab, (n - 1) & h)) != null) {

//如果搜尋到的節點key與傳入的key相同且不為null,直接傳回這個節點

if ((eh = e.hash) == h) {

if ((ek = e.key) == key || (ek != null && key.equals(ek)))

return e.val;

//如果eh<0 說明這個節點在樹上 直接尋找

else if (eh < 0)

return (p = e.find(h, key)) != null ? p.val : null;

//否則周遊連結清單 找到對應的值并傳回

while ((e = e.next) != null) {

if (e.hash == h &&

((ek = e.key) == key || (ek != null && key.equals(ek))))