天天看点

非阻塞同步算法实战(一)

上个月,我被安排独自负责一个聊天系统的服务端,因为一些原因,我没使用现成的开源框架,网络那块直接使用aio,收数据时,因为只会从channel里过来,所以不需要考虑同步问题;但是发送数据时,因为有聊天消息的转发,所以必需处理这个同步问题。aio中,是处理完一个注册的操作后,再执行我们定义的方法,此时,如果还有数据需要写,则继续注册写操作,如果没有则不注册;提交数据时,如果当前没有注册写操作,则注册一个,否则仅提交(此时再注册则会报异常)。这样,需要同步的点就是:如何知道当前还有没有数据需要发送(因为其它线程也可以提交数据到此处),和如何知道此次提交时,有没有注册写操作。总之,要完成:有数据要发送时,必需有写操作被注册,并且只能注册一次;没有数据时,不能有写操作被注册。

经过分析,上面的问题,可以抽象成:我需要知道当往队列里插入一条数据之前,该队列是否为空,如果为空则应该注册新的写操作。当从队列里取出一条数据后,该队列是否为非空,如果非空则应该继续注册写操作。(本文之后以“关注的操作”来表示这种场景下的插入或取出操作)

目前的问题是,我使用的队列是concurrentlinkedqueue,但是它的取出数据的方法,没有返回值告诉我们从队列里取出数据之后队列是否为空,如果是使用size或peek方法来执行判断,那就必需加锁了,否则在拿到队列大小时,可能队列大小已经变化了。所以我首先想到的是,如何对该队列进行改造,让它提供该信息。

注意:这里指的不是当次而是之后,所以如果我们使用队列的peek()方法返回null,就知道队列是否为空,但是不知道之后是否为空 ,并且,当关注的操作发生时,在插入或取出操作的返回值里告知此信息,来指导是否继续注册写操作。

如果使用锁的话很容易处理,用锁同步插入和取出方法,下面是锁实现的参考:

<code>01</code>

<code>public</code> <code>e poll() {</code>

<code>02</code>

<code>    </code><code>synchronized</code> <code>(</code><code>this</code><code>) {</code>

<code>03</code>

<code>        </code><code>e re = q.poll();</code>

<code>04</code>

<code>        </code><code>// 获取元素后,队列是空,表示是我关注的操作</code>

<code>05</code>

<code>        </code><code>if</code> <code>(q.peek() ==</code><code>null</code><code>) {</code>

<code>06</code>

<code>07</code>

<code>        </code><code>}</code>

<code>08</code>

<code>        </code><code>return</code> <code>re;</code>

<code>09</code>

<code>    </code><code>}</code>

<code>10</code>

<code>}</code>

<code>11</code>

<code>12</code>

<code>public</code> <code>void</code> <code>offer(e e) {</code>

<code>13</code>

<code>14</code>

<code>        </code><code>// 插入元素前,队列是空,表示是我关注的操作</code>

<code>15</code>

<code>16</code>

<code>17</code>

<code>18</code>

<code>        </code><code>q.offer(e);</code>

<code>19</code>

<code>20</code>

但因为是服务端,我想用非阻塞同步算法来实现。

我第一次想到的改造办法是,将head占位节点改成固定的,头节点移动时,只将head节点的next指向新的节点,在插入数据时,如果是在head节点上成功执行的该操作,那么该插入就是关注的的操作;在取出时,如果将head节点的next置为了null,那么该取出就是关注的操作( 因为之前的占位节点是变化的,所以没法比较,必需用同步,现在是固定的了,所以可以直接与head节点进行比较 )。如此一来,问题好像被解决了。改造完之后,出于严谨,我仔细读了一遍代码,发现引入了新的问题,我的取出操作是这样写的

<code>/**</code>

<code> </code><code>* @author [email protected]</code>

<code> </code><code>*/</code>

<code>public</code> <code>e poll(){</code>

<code>    </code><code>for</code><code>(;;){</code>

<code>        </code><code>node n = head.nextref.get();</code><code>//head指向固定的head节点,为final</code>

<code>        </code><code>if</code><code>(n ==</code><code>null</code><code>)</code>

<code>            </code><code>return</code> <code>null</code><code>;</code>

<code>        </code><code>node m = n.nextref.get();</code>

<code>        </code><code>if</code><code>(head.next.compareandset(n,m){</code>

<code>            </code><code>if</code><code>(m==</code><code>null</code><code>)</code>

<code>                </code><code>;</code><code>//此时为关注的操作(为了简化代码显示,不将该信息当作返回值返回了,仅注释)</code>

<code>            </code><code>return</code> <code>n.itemref.get();</code>

这里有一个致命的问题:如果m为null,在cas期间,插入了新节点,n的next由null变成了非null,紧接着又把head的next更新为了null,那么链就断了,该方法还存在一些其它的问题,如当队列为空的时候,尾节点指向了错误的位置,本应该是head的。我认为最根本的原因在于,head不能设为固定的,否则会引发一堆问题。第一次尝试宣告失败。

这次我尝试将head跟tail两个引用包装成一个对象,然后对这个对象进行cas更新(这仅为一处理论上的尝试,因为从性能上面来讲,已经大打折扣了,创建了很多额外的对象),如果head跟tail指向了同一个节点,则认为队列是空的,根据此信息来判断一个操作是不是关注的操作。但该尝试仅停留在了理论分析阶段,期间发现了一些小问题,没法解决,后来我发现,我把concurrentlinkedqueue原本可以分成两步执行的插入和取出操作(更新节点的next或item引用,然后再更新head或tail引用),变成了必需一步完成,concurrentlinkedqueue尚不能一步完成,我何德何能,可将它们一步完成?所以直接放弃了。

经过两次的失败尝试,我几乎绝望了,我怀疑这是不是不能判断出是否为关注的操作。

因为是在做项目,周末已经过去了,不能再搞这些“研究”了,所以我退而求其次,想了个不太漂亮的办法,在队列外部维护一个变量,记录队列有多大,在插入或取出后,更新该变量,使用的是atomicinteger,如果更新时,将变量从1变成0,或从0变成了1,就认为该插入或取出为关注的操作。

<code>private</code> <code>atomicinteger size =</code><code>new</code> <code>atomicinteger(</code><code>0</code><code>);</code>

<code>    </code><code>e re = q.poll();</code>

<code>    </code><code>if</code> <code>(re ==</code><code>null</code><code>)</code>

<code>        </code><code>return</code> <code>null</code><code>;</code>

<code>    </code><code>for</code><code>(</code><code>int</code> <code>old;;){</code>

<code>        </code><code>old = size.get();</code>

<code>        </code><code>if</code><code>(size.compareandset(old,old-</code><code>1</code><code>)){</code>

<code>            </code><code>// 获取元素后,队列是空,表示是我关注的操作</code>

<code>            </code><code>if</code><code>(old ==</code><code>1</code><code>){</code>

<code>            </code><code>}</code>

<code>            </code><code>break</code><code>;</code>

<code>    </code><code>return</code> <code>re;</code>

<code>    </code><code>q.offer(e);</code>

<code>21</code>

<code>22</code>

<code>23</code>

<code>        </code><code>if</code><code>(size.compareandset(old,old+</code><code>1</code><code>)){</code>

<code>24</code>

<code>            </code><code>// 插入元素前,队列是空,表示是我关注的操作</code>

<code>25</code>

<code>            </code><code>if</code><code>(old ==</code><code>0</code><code>){</code>

<code>26</code>

<code>27</code>

<code>28</code>

<code>29</code>

<code>30</code>

<code>31</code>

此时,也许细心的朋友会问,因为没有使用锁,这个变量并不能真实反映队列的大小,也就不能确定它是不是关注的操作。没错,是不能真实反映,但是,我获取关注的操作的目的是用来指导我:该不该注册新的写操作,该变量的值变化就能提供正确的指导,所以,同样是正确的,只不过途径不同而已。理论上的分析和后来的项目正确运行都印证了该方法的正确性。

因为上面的方法额外加了一次lock-free级别的cas操作,我心里总不太舒服,空余时间总在琢磨,真的就没有办法,在不增加额外lock-free级别cas开支的情况下,知晓一个操作是不是关注的操作?

后来经分析,如果要知晓是不是关注的操作,跟两个数据有关,真实的头节点跟尾节点(不同于head跟tail,因为它们是滞后的,之前将它们包装成一个对象就是犯了该错误),concurrentlinkedqueue的实现中,这两个节点是没有竞争的,一个是更新item,一个是更新next,必需得让他们竞争同一个东西,才能解决该问题,于是我想到了一个办法,取出完成后,如果该节点的next为null,就将其用cas置为一个特殊的值,若成功则认为是关注的操作;插入成功后,如果next被替换掉的值不是null而是这个特殊值,那么该插入也为关注的操作。这仅增加了一次wait-free级别的cas操作(取出后的那次cas),perfect!

因为concurrentlinkedqueue的很多变量、内部类都是私有的,没法通过继承来改造,没办法,只得自行实现。对于队列里使用的node,实现的方式有很多种,可以使用atomicreference、atomicreferencefieldupdater来实现,如果你愿意的话,甚至是像concurrentlinkedqueue一样,用sun.misc.unsafe来实现(注意:一般来说,sun包下的类是不推荐使用的),各有优缺点吧,所以我就不提供该队列的具体实现了,下面给出在concurrentlinkedqueue(版本:1.7.0_10)基础上进行的改造,供参考。注意,如果需要用到size等方法,因为特殊值的引入,影响了之前的判断逻辑,应重新编写。

<code> </code><code>private</code> <code>static</code> <code>final</code> <code>node mark_node =</code><code>new</code> <code>node(</code><code>null</code><code>);</code>

<code> </code><code>public</code> <code>boolean</code> <code>offer(e e) {</code>

<code>     </code><code>checknotnull(e);</code>

<code>     </code><code>final</code> <code>node newnode =</code><code>new</code> <code>node(e);</code>

<code>     </code><code>for</code> <code>(node t = tail, p = t;;) {</code>

<code>         </code><code>node q = p.next;</code>

<code>         </code><code>if</code> <code>(q ==</code><code>null</code> <code>|| q == mark_node) {</code><code>//修改1:加入条件:或q == mark_node</code>

<code>             </code><code>// p is last node</code>

<code>             </code><code>if</code> <code>(p.casnext(q, newnode)) {</code><code>//修改2:将null改为q</code>

<code>                 </code><code>// successful cas is the linearization point</code>

<code>                 </code><code>// for e to become an element of this queue,</code>

<code>                 </code><code>// and for newnode to become "live".</code>

<code>                 </code><code>if</code> <code>(q == mark_node)</code><code>//修改3:</code>

<code>                     </code><code>;</code><code>//此时为关注的操作(为了简化代码显示,仅注释)</code>

<code>                 </code><code>if</code> <code>(p != t)</code><code>// hop two nodes at a time</code>

<code>                     </code><code>castail(t, newnode);</code><code>// failure is ok.</code>

<code>                 </code><code>return</code> <code>true</code><code>;</code>

<code>             </code><code>}</code>

<code>             </code><code>// lost cas race to another thread; re-read next</code>

<code>         </code><code>}</code>

<code>         </code><code>else</code> <code>if</code> <code>(p == q)</code>

<code>             </code><code>// we have fallen off list. if tail is unchanged, it</code>

<code>             </code><code>// will also be off-list, in which case we need to</code>

<code>             </code><code>// jump to head, from which all live nodes are always</code>

<code>             </code><code>// reachable. else the new tail is a better bet.</code>

<code>             </code><code>p = (t != (t = tail)) ? t : head;</code>

<code>32</code>

<code>         </code><code>else</code>

<code>33</code>

<code>             </code><code>// check for tail updates after two hops.</code>

<code>34</code>

<code>             </code><code>p = (p != t &amp;&amp; t != (t = tail)) ? t : q;</code>

<code>35</code>

<code>     </code><code>}</code>

<code>36</code>

<code> </code><code>}</code>

<code>37</code>

<code>38</code>

<code> </code><code>public</code> <code>e poll() {</code>

<code>39</code>

<code>     </code><code>restartfromhead:</code>

<code>40</code>

<code>     </code><code>for</code> <code>(;;) {</code>

<code>41</code>

<code>         </code><code>for</code> <code>(node h = head, p = h, q;;) {</code>

<code>42</code>

<code>             </code><code>e item = p.item;</code>

<code>43</code>

<code>44</code>

<code>             </code><code>if</code> <code>(item !=</code><code>null</code> <code>&amp;&amp; p.casitem(item,</code><code>null</code><code>)) {</code>

<code>45</code>

<code>46</code>

<code>                 </code><code>// for item to be removed from this queue.</code>

<code>47</code>

<code>                 </code><code>if</code> <code>(p != h)</code><code>// hop two nodes at a time</code>

<code>48</code>

<code>                     </code><code>updatehead(h, ((q = p.next) !=</code><code>null</code><code>) ? q : p);</code>

<code>49</code>

<code>                 </code><code>if</code> <code>(p.casnext(</code><code>null</code><code>,mark_node))</code><code>//修改1:</code>

<code>50</code>

<code>                     </code><code>;</code><code>//此时为关注的操作</code>

<code>51</code>

<code>                 </code><code>return</code> <code>item;</code>

<code>52</code>

<code>53</code>

<code>             </code><code>else</code> <code>if</code> <code>((q = p.next) ==</code><code>null</code><code>) {</code>

<code>54</code>

<code>                 </code><code>updatehead(h, p);</code>

<code>55</code>

<code>                 </code><code>return</code> <code>null</code><code>;</code>

<code>56</code>

<code>57</code>

<code>             </code><code>else</code> <code>if</code> <code>(p == q)</code>

<code>58</code>

<code>                 </code><code>continue</code> <code>restartfromhead;</code>

<code>59</code>

<code>             </code><code>else</code>

<code>60</code>

<code>                 </code><code>p = q;</code>

<code>61</code>

<code>62</code>

<code>63</code>

设计非阻塞算法的关键在于,找出竞争点,如果获取的某个信息跟两个操作有关,那么应该让这两个操作竞争同一个东西,这样才能反应出它们的关系。 

继续阅读