天天看點

Java中的讀/寫鎖

Java5在java.util.concurrent包中已經包含了讀寫鎖。盡管如此,我們還是應該了解其實作背後的原理。

以下是本文的主題

先讓我們對讀寫通路資源的條件做個概述:

<b>讀取</b> 沒有線程正在做寫操作,且沒有線程在請求寫操作。

<b>寫入</b> 沒有線程正在做讀寫操作。

如果某個線程想要讀取資源,隻要沒有線程正在對該資源進行寫操作且沒有線程請求對該資源的寫操作即可。我們假設對寫操作的請求比對讀操作的請求更重要,就要提升寫請求的優先級。此外,如果讀操作發生的比較頻繁,我們又沒有提升寫操作的優先級,那麼就會産生“饑餓”現象。請求寫操作的線程會一直阻塞,直到所有的讀線程都從ReadWriteLock上解鎖了。如果一直保證新線程的讀操作權限,那麼等待寫操作的線程就會一直阻塞下去,結果就是發生“饑餓”。是以,隻有當沒有線程正在鎖住ReadWriteLock進行寫操作,且沒有線程請求該鎖準備執行寫操作時,才能保證讀操作繼續。

當其它線程沒有對共享資源進行讀操作或者寫操作時,某個線程就有可能獲得該共享資源的寫鎖,進而對共享資源進行寫操作。有多少線程請求了寫鎖以及以何種順序請求寫鎖并不重要,除非你想保證寫鎖請求的公平性。

按照上面的叙述,簡單的實作出一個讀/寫鎖,代碼如下

<a href="http://ifeve.com/read-write-locks/#viewSource">檢視源代碼</a>

<code>01</code>

<code>public</code> <code>class</code> <code>ReadWriteLock{</code>

<code>02</code>

<code>    </code><code>private</code> <code>int</code> <code>readers =</code><code>0</code><code>;</code>

<code>03</code>

<code>    </code><code>private</code> <code>int</code> <code>writers =</code><code>0</code><code>;</code>

<code>04</code>

<code>    </code><code>private</code> <code>int</code> <code>writeRequests =</code><code>0</code><code>;</code>

<code>05</code>

<code>06</code>

<code>    </code><code>public</code> <code>synchronized</code> <code>void</code> <code>lockRead()</code>

<code>07</code>

<code>        </code><code>throws</code> <code>InterruptedException{</code>

<code>08</code>

<code>        </code><code>while</code><code>(writers &gt;</code><code>0</code> <code>|| writeRequests &gt;</code><code>0</code><code>){</code>

<code>09</code>

<code>            </code><code>wait();</code>

<code>10</code>

<code>        </code><code>}</code>

<code>11</code>

<code>        </code><code>readers++;</code>

<code>12</code>

<code>    </code><code>}</code>

<code>13</code>

<code>14</code>

<code>    </code><code>public</code> <code>synchronized</code> <code>void</code> <code>unlockRead(){</code>

<code>15</code>

<code>        </code><code>readers--;</code>

<code>16</code>

<code>        </code><code>notifyAll();</code>

<code>17</code>

<code>18</code>

<code>19</code>

<code>    </code><code>public</code> <code>synchronized</code> <code>void</code> <code>lockWrite()</code>

<code>20</code>

<code>21</code>

<code>        </code><code>writeRequests++;</code>

<code>22</code>

<code>23</code>

<code>        </code><code>while</code><code>(readers &gt;</code><code>0</code> <code>|| writers &gt;</code><code>0</code><code>){</code>

<code>24</code>

<code>25</code>

<code>26</code>

<code>        </code><code>writeRequests--;</code>

<code>27</code>

<code>        </code><code>writers++;</code>

<code>28</code>

<code>29</code>

<code>30</code>

<code>    </code><code>public</code> <code>synchronized</code> <code>void</code> <code>unlockWrite()</code>

<code>31</code>

<code>32</code>

<code>        </code><code>writers--;</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code>}</code>

ReadWriteLock類中,讀鎖和寫鎖各有一個擷取鎖和釋放鎖的方法。

讀鎖的實作在lockRead()中,隻要沒有線程擁有寫鎖(writers==0),且沒有線程在請求寫鎖(writeRequests ==0),所有想獲得讀鎖的線程都能成功擷取。

寫鎖的實作在lockWrite()中,當一個線程想獲得寫鎖的時候,首先會把寫鎖請求數加1(writeRequests++),然後再去判斷是否能夠真能獲得寫鎖,當沒有線程持有讀鎖(readers==0 ),且沒有線程持有寫鎖(writers==0)時就能獲得寫鎖。有多少線程在請求寫鎖并無關系。

需要注意的是,在兩個釋放鎖的方法(unlockRead,unlockWrite)中,都調用了notifyAll方法,而不是notify。要解釋這個原因,我們可以想象下面一種情形:

如果有線程在等待擷取讀鎖,同時又有線程在等待擷取寫鎖。如果這時其中一個等待讀鎖的線程被notify方法喚醒,但因為此時仍有請求寫鎖的線程存在(writeRequests&gt;0),是以被喚醒的線程會再次進入阻塞狀态。然而,等待寫鎖的線程一個也沒被喚醒,就像什麼也沒發生過一樣(譯者注:信号丢失現象)。如果用的是notifyAll方法,所有的線程都會被喚醒,然後判斷能否獲得其請求的鎖。

用notifyAll還有一個好處。如果有多個讀線程在等待讀鎖且沒有線程在等待寫鎖時,調用unlockWrite()後,所有等待讀鎖的線程都能立馬成功擷取讀鎖 —— 而不是一次隻允許一個。

上面實作的讀/寫鎖(ReadWriteLock) 是不可重入的,當一個已經持有寫鎖的線程再次請求寫鎖時,就會被阻塞。原因是已經有一個寫線程了——就是它自己。此外,考慮下面的例子:

Thread 1 獲得了讀鎖

Thread 2 請求寫鎖,但因為Thread 1 持有了讀鎖,是以寫鎖請求被阻塞。

Thread 1 再想請求一次讀鎖,但因為Thread 2處于請求寫鎖的狀态,是以想再次擷取讀鎖也會被阻塞。

上面這種情形使用前面的ReadWriteLock就會被鎖定——一種類似于死鎖的情形。不會再有線程能夠成功擷取讀鎖或寫鎖了。

為了讓ReadWriteLock可重入,需要對它做一些改進。下面會分别處理讀鎖的重入和寫鎖的重入。

為了讓ReadWriteLock的讀鎖可重入,我們要先為讀鎖重入建立規則:

要保證某個線程中的讀鎖可重入,要麼滿足擷取讀鎖的條件(沒有寫或寫請求),要麼已經持有讀鎖(不管是否有寫請求)。

要确定一個線程是否已經持有讀鎖,可以用一個map來存儲已經持有讀鎖的線程以及對應線程擷取讀鎖的次數,當需要判斷某個線程能否獲得讀鎖時,就利用map中存儲的資料進行判斷。下面是方法lockRead和unlockRead修改後的的代碼:

<code>    </code><code>private</code> <code>Map&lt;Thread, Integer&gt; readingThreads =</code>

<code>        </code><code>new</code> <code>HashMap&lt;Thread, Integer&gt;();</code>

<code>        </code><code>Thread callingThread = Thread.currentThread();</code>

<code>        </code><code>while</code><code>(! canGrantReadAccess(callingThread)){</code>

<code>            </code><code>wait();                                                                  </code>

<code>        </code><code>readingThreads.put(callingThread,</code>

<code>            </code><code>(getAccessCount(callingThread) +</code><code>1</code><code>));</code>

<code>        </code><code>int</code> <code>accessCount = getAccessCount(callingThread);</code>

<code>        </code><code>if</code><code>(accessCount ==</code><code>1</code><code>) {</code>

<code>            </code><code>readingThreads.remove(callingThread);</code>

<code>        </code><code>}</code><code>else</code> <code>{</code>

<code>            </code><code>readingThreads.put(callingThread, (accessCount -</code><code>1</code><code>));</code>

<code>    </code><code>private</code> <code>boolean</code> <code>canGrantReadAccess(Thread callingThread){</code>

<code>        </code><code>if</code><code>(writers &gt;</code><code>0</code><code>)</code><code>return</code> <code>false</code><code>;</code>

<code>        </code><code>if</code><code>(isReader(callingThread)</code><code>return</code> <code>true</code><code>;</code>

<code>        </code><code>if</code><code>(writeRequests &gt;</code><code>0</code><code>)</code><code>return</code> <code>false</code><code>;</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>36</code>

<code>37</code>

<code>    </code><code>private</code> <code>int</code> <code>getReadAccessCount(Thread callingThread){</code>

<code>38</code>

<code>        </code><code>Integer accessCount = readingThreads.get(callingThread);</code>

<code>39</code>

<code>        </code><code>if</code><code>(accessCount ==</code><code>null</code><code>)</code><code>return</code> <code>0</code><code>;</code>

<code>40</code>

<code>        </code><code>return</code> <code>accessCount.intValue();</code>

<code>41</code>

<code>42</code>

<code>43</code>

<code>    </code><code>private</code> <code>boolean</code> <code>isReader(Thread callingThread){</code>

<code>44</code>

<code>        </code><code>return</code> <code>readingThreads.get(callingThread) !=</code><code>null</code><code>;</code>

<code>45</code>

<code>46</code>

代碼中我們可以看到,隻有在沒有線程擁有寫鎖的情況下才允許讀鎖的重入。此外,重入的讀鎖比寫鎖優先級高。

僅當一個線程已經持有寫鎖,才允許寫鎖重入(再次獲得寫鎖)。下面是方法lockWrite和unlockWrite修改後的的代碼。

<code>    </code><code>private</code> <code>int</code> <code>writeAccesses    =</code><code>0</code><code>;</code>

<code>    </code><code>private</code> <code>int</code> <code>writeRequests    =</code><code>0</code><code>;</code>

<code>    </code><code>private</code> <code>Thread writingThread =</code><code>null</code><code>;</code>

<code>        </code><code>while</code><code>(!canGrantWriteAccess(callingThread)){</code>

<code>        </code><code>writeAccesses++;</code>

<code>        </code><code>writingThread = callingThread;</code>

<code>        </code><code>writeAccesses--;</code>

<code>        </code><code>if</code><code>(writeAccesses ==</code><code>0</code><code>){</code>

<code>            </code><code>writingThread =</code><code>null</code><code>;</code>

<code>    </code><code>private</code> <code>boolean</code> <code>canGrantWriteAccess(Thread callingThread){</code>

<code>        </code><code>if</code><code>(hasReaders())</code><code>return</code> <code>false</code><code>;</code>

<code>        </code><code>if</code><code>(writingThread ==</code><code>null</code><code>)   </code><code>return</code> <code>true</code><code>;</code>

<code>        </code><code>if</code><code>(!isWriter(callingThread))</code><code>return</code> <code>false</code><code>;</code>

<code>    </code><code>private</code> <code>boolean</code> <code>hasReaders(){</code>

<code>        </code><code>return</code> <code>readingThreads.size() &gt;</code><code>0</code><code>;</code>

<code>    </code><code>private</code> <code>boolean</code> <code>isWriter(Thread callingThread){</code>

<code>        </code><code>return</code> <code>writingThread == callingThread;</code>

注意在确定目前線程是否能夠擷取寫鎖的時候,是如何處理的。

有時,我們希望一個擁有讀鎖的線程,也能獲得寫鎖。想要允許這樣的操作,要求這個線程是唯一一個擁有讀鎖的線程。writeLock()需要做點改動來達到這個目的:

<code>    </code><code>public</code> <code>synchronized</code> <code>void</code> <code>unlockWrite()</code><code>throws</code> <code>InterruptedException{</code>

<code>        </code><code>if</code><code>(isOnlyReader(callingThread))</code><code>return</code> <code>true</code><code>;</code>

<code>        </code><code>if</code><code>(writingThread ==</code><code>null</code><code>)</code><code>return</code> <code>true</code><code>;</code>

<code>    </code><code>private</code> <code>boolean</code> <code>isOnlyReader(Thread thread){</code>

<code>        </code><code>return</code> <code>readers ==</code><code>1</code> <code>&amp;&amp; readingThreads.get(callingThread) !=</code><code>null</code><code>;</code>

<code>47</code>

<code>48</code>

現在ReadWriteLock類就可以從讀鎖更新到寫鎖了。

有時擁有寫鎖的線程也希望得到讀鎖。如果一個線程擁有了寫鎖,那麼自然其它線程是不可能擁有讀鎖或寫鎖了。是以對于一個擁有寫鎖的線程,再獲得讀鎖,是不會有什麼危險的。我們僅僅需要對上面canGrantReadAccess方法進行簡單地修改:

<code>1</code>

<code>2</code>

<code>3</code>

<code>        </code><code>if</code><code>(isWriter(callingThread))</code><code>return</code> <code>true</code><code>;</code>

<code>4</code>

<code>        </code><code>if</code><code>(writingThread !=</code><code>null</code><code>)</code><code>return</code> <code>false</code><code>;</code>

<code>5</code>

<code>6</code>

<code>7</code>

<code>8</code>

<code>9</code>

下面是完整的ReadWriteLock實作。為了便于代碼的閱讀與了解,簡單對上面的代碼做了重構。重構後的代碼如下。

<code>001</code>

<code>002</code>

<code>003</code>

<code>004</code>

<code>005</code>

<code>006</code>

<code>007</code>

<code>008</code>

<code>009</code>

<code>010</code>

<code>011</code>

<code>012</code>

<code>013</code>

<code>014</code>

<code>015</code>

<code>016</code>

<code>017</code>

<code>            </code><code>(getReadAccessCount(callingThread) +</code><code>1</code><code>));</code>

<code>018</code>

<code>019</code>

<code>020</code>

<code>021</code>

<code>022</code>

<code>        </code><code>if</code><code>(hasWriter())</code><code>return</code> <code>false</code><code>;</code>

<code>023</code>

<code>        </code><code>if</code><code>(isReader(callingThread))</code><code>return</code> <code>true</code><code>;</code>

<code>024</code>

<code>        </code><code>if</code><code>(hasWriteRequests())</code><code>return</code> <code>false</code><code>;</code>

<code>025</code>

<code>026</code>

<code>027</code>

<code>028</code>

<code>029</code>

<code>030</code>

<code>031</code>

<code>        </code><code>if</code><code>(!isReader(callingThread)){</code>

<code>032</code>

<code>            </code><code>throw</code> <code>new</code> <code>IllegalMonitorStateException(</code>

<code>033</code>

<code>                </code><code>"Calling Thread does not"</code> <code>+</code>

<code>034</code>

<code>                </code><code>" hold a read lock on this ReadWriteLock"</code><code>);</code>

<code>035</code>

<code>036</code>

<code>        </code><code>int</code> <code>accessCount = getReadAccessCount(callingThread);</code>

<code>037</code>

<code>        </code><code>if</code><code>(accessCount ==</code><code>1</code><code>){</code>

<code>038</code>

<code>039</code>

<code>040</code>

<code>041</code>

<code>042</code>

<code>043</code>

<code>044</code>

<code>045</code>

<code>046</code>

<code>047</code>

<code>048</code>

<code>049</code>

<code>050</code>

<code>051</code>

<code>052</code>

<code>053</code>

<code>054</code>

<code>055</code>

<code>056</code>

<code>057</code>

<code>058</code>

<code>059</code>

<code>        </code><code>if</code><code>(!isWriter(Thread.currentThread()){</code>

<code>060</code>

<code>        </code><code>throw</code> <code>new</code> <code>IllegalMonitorStateException(</code>

<code>061</code>

<code>            </code><code>"Calling Thread does not"</code> <code>+</code>

<code>062</code>

<code>            </code><code>" hold the write lock on this ReadWriteLock"</code><code>);</code>

<code>063</code>

<code>064</code>

<code>065</code>

<code>066</code>

<code>067</code>

<code>068</code>

<code>069</code>

<code>070</code>

<code>071</code>

<code>072</code>

<code>073</code>

<code>074</code>

<code>075</code>

<code>076</code>

<code>077</code>

<code>078</code>

<code>079</code>

<code>080</code>

<code>081</code>

<code>082</code>

<code>083</code>

<code>084</code>

<code>085</code>

<code>086</code>

<code>087</code>

<code>088</code>

<code>089</code>

<code>090</code>

<code>091</code>

<code>092</code>

<code>093</code>

<code>094</code>

<code>095</code>

<code>    </code><code>private</code> <code>boolean</code> <code>isOnlyReader(Thread callingThread){</code>

<code>096</code>

<code>        </code><code>return</code> <code>readingThreads.size() ==</code><code>1</code> <code>&amp;&amp;</code>

<code>097</code>

<code>            </code><code>readingThreads.get(callingThread) !=</code><code>null</code><code>;</code>

<code>098</code>

<code>099</code>

<code>100</code>

<code>    </code><code>private</code> <code>boolean</code> <code>hasWriter(){</code>

<code>101</code>

<code>        </code><code>return</code> <code>writingThread !=</code><code>null</code><code>;</code>

<code>102</code>

<code>103</code>

<code>104</code>

<code>105</code>

<code>106</code>

<code>107</code>

<code>108</code>

<code>    </code><code>private</code> <code>boolean</code> <code>hasWriteRequests(){</code>

<code>109</code>

<code>        </code><code>return</code> <code>this</code><code>.writeRequests &gt;</code><code>0</code><code>;</code>

<code>110</code>

<code>111</code>

在利用ReadWriteLock來保護臨界區時,如果臨界區可能抛出異常,在finally塊中調用readUnlock()和writeUnlock()就顯得很重要了。這樣做是為了保證ReadWriteLock能被成功解鎖,然後其它線程可以請求到該鎖。這裡有個例子:

<code>lock.lockWrite();</code>

<code>try</code><code>{</code>

<code>    </code><code>//do critical section code, which may throw exception</code>

<code>}</code><code>finally</code> <code>{</code>

<code>    </code><code>lock.unlockWrite();</code>

上面這樣的代碼結構能夠保證臨界區中抛出異常時ReadWriteLock也會被釋放。如果unlockWrite方法不是在finally塊中調用的,當臨界區抛出了異常時,ReadWriteLock 會一直保持在寫鎖定狀态,就會導緻所有調用lockRead()或lockWrite()的線程一直阻塞。唯一能夠重新解鎖ReadWriteLock的因素可能就是ReadWriteLock是可重入的,當抛出異常時,這個線程後續還可以成功擷取這把鎖,然後執行臨界區以及再次調用unlockWrite(),這就會再次釋放ReadWriteLock。但是如果該線程後續不再擷取這把鎖了呢?是以,在finally中調用unlockWrite對寫出健壯代碼是很重要的。