天天看点

Netty源码解读(三)Channel与Pipeline

channel是理解和使用netty的核心。channel的涉及内容较多,这里我使用由浅入深的介绍方法。在这篇文章中,我们主要介绍channel部分中pipeline实现机制。为了避免枯燥,借用一下《盗梦空间》的“梦境”概念,希望大家喜欢。

在netty里,<code>channel</code>是通讯的载体,而<code>channelhandler</code>负责channel中的逻辑处理。

那么<code>channelpipeline</code>是什么呢?我觉得可以理解为channelhandler的容器:一个channel包含一个channelpipeline,所有channelhandler都会注册到channelpipeline中,并按顺序组织起来。

在netty中,<code>channelevent</code>是数据或者状态的载体,例如传输的数据对应<code>messageevent</code>,状态的改变对应<code>channelstateevent</code>。当对channel进行操作时,会产生一个channelevent,并发送到<code>channelpipeline</code>。channelpipeline会选择一个channelhandler进行处理。这个channelhandler处理之后,可能会产生新的channelevent,并流转到下一个channelhandler。

Netty源码解读(三)Channel与Pipeline

例如,一个数据最开始是一个<code>messageevent</code>,它附带了一个未解码的原始二进制消息<code>channelbuffer</code>,然后某个handler将其解码成了一个数据对象,并生成了一个新的<code>messageevent</code>,并传递给下一步进行处理。

到了这里,可以看到,其实channel的核心流程位于<code>channelpipeline</code>中。于是我们进入channelpipeline的深层梦境里,来看看它具体的实现。

netty的channelpipeline包含两条线路:upstream和downstream。upstream对应上行,接收到的消息、被动的状态改变,都属于upstream。downstream则对应下行,发送的消息、主动的状态改变,都属于downstream。<code>channelpipeline</code>接口包含了两个重要的方法:<code>sendupstream(channelevent e)</code>和<code>senddownstream(channelevent e)</code>,就分别对应了upstream和downstream。

对应的,channelpipeline里包含的channelhandler也包含两类:<code>channelupstreamhandler</code>和<code>channeldownstreamhandler</code>。每条线路的handler是互相独立的。它们都很简单的只包含一个方法:<code>channelupstreamhandler.handleupstream</code>和<code>channeldownstreamhandler.handledownstream</code>。

netty官方的javadoc里有一张图(<code>channelpipeline</code>接口里),非常形象的说明了这个机制(我对原图进行了一点修改,加上了<code>channelsink</code>,因为我觉得这部分对理解代码流程会有些帮助):

Netty源码解读(三)Channel与Pipeline

什么叫<code>channelsink</code>呢?channelsink包含一个重要方法<code>channelsink.eventsunk</code>,可以接受任意channelevent。“sink”的意思是”下沉”,那么”channelsink”好像可以理解为”channel下沉的地方”?实际上,它的作用确实是这样,也可以换个说法:“处于末尾的万能handler”。最初读到这里,也有些困惑,这么理解之后,就感觉简单许多。只有downstream包含<code>channelsink</code>,这里会做一些建立连接、绑定端口等重要操作。为什么uploadstream没有channelsink呢?我只能认为,一方面,不符合”sink”的意义,另一方面,也没有什么处理好做的吧!

这里有个值得注意的地方:在一条“流”里,一个<code>channelevent</code>并不会主动的”流”经所有的handler,而是由上一个handler显式的调用<code>channelpipeline.sendup(down)stream</code>产生,并交给下一个handler处理。也就是说,每个handler接收到一个channelevent,并处理结束后,如果需要继续处理,那么它需要调用<code>sendup(down)stream</code>新发起一个事件。如果它不再发起事件,那么处理就到此结束,即使它后面仍然有handler没有执行。这个机制可以保证最大的灵活性,当然对handler的先后顺序也有了更严格的要求。

下面我们从代码层面来对这里面发生的事情进行深入分析,这部分涉及到一些细节,需要打开项目源码,对照来看,会比较有收获。

<code>channelpipeline</code>的主要的实现代码在<code>defaultchannelpipeline</code>类里。列一下defaultchannelpipeline的主要字段:

<code>1</code>

<code>public</code> <code>class</code> <code>defaultchannelpipeline</code><code>implements</code> <code>channelpipeline {</code>

<code>2</code>

<code>3</code>

<code>    </code><code>private</code> <code>volatile</code> <code>channel channel;</code>

<code>4</code>

<code>    </code><code>private</code> <code>volatile</code> <code>channelsink sink;</code>

<code>5</code>

<code>    </code><code>private</code> <code>volatile</code> <code>defaultchannelhandlercontext head;</code>

<code>6</code>

<code>    </code><code>private</code> <code>volatile</code> <code>defaultchannelhandlercontext tail;</code>

<code>7</code>

<code>    </code><code>private</code> <code>final</code> <code>map&amp;amp;lt;string, defaultchannelhandlercontext&amp;amp;gt; name2ctx =</code>

<code>8</code>

<code>        </code><code>new</code> <code>hashmap&amp;amp;lt;string, defaultchannelhandlercontext&amp;amp;gt;(</code><code>4</code><code>);</code>

<code>9</code>

<code>}</code>

这里需要介绍一下<code>channelhandlercontext</code>这个接口。顾名思义,channelhandlercontext保存了netty与handler相关的的上下文信息。而咱们这里的<code>defaultchannelhandlercontext</code>,则是对<code>channelhandler</code>的一个包装。一个<code>defaultchannelhandlercontext</code>内部,除了包含一个<code>channelhandler</code>,还保存了”next”和”prev”两个指针,从而形成一个双向链表。

因此,在<code>defaultchannelpipeline</code>中,我们看到的是对<code>defaultchannelhandlercontext</code>的引用,而不是对<code>channelhandler</code>的直接引用。这里包含”head”和”tail”两个引用,分别指向链表的头和尾。而name2ctx则是一个按名字索引defaultchannelhandlercontext用户的一个map,主要在按照名称删除或者添加channelhandler时使用。

前面提到了,<code>channelpipeline</code>接口的两个重要的方法:<code>sendupstream(channelevent e)</code>和<code>senddownstream(channelevent e)</code>。所有事件的发起都是基于这两个方法进行的。<code>channels</code>类有一系列<code>firechannelbound</code>之类的<code>firexxxx</code>方法,其实都是对这两个方法的facade包装。

下面来看一下这两个方法的实现。先看sendupstream(对代码做了一些简化,保留主逻辑):

<code>01</code>

<code>public</code> <code>void</code> <code>sendupstream(channelevent e) {</code>

<code>02</code>

<code>    </code><code>defaultchannelhandlercontext head = getactualupstreamcontext(</code><code>this</code><code>.head);</code>

<code>03</code>

<code>    </code><code>head.gethandler().handleupstream(head, e);</code>

<code>04</code>

<code>05</code>

<code>06</code>

<code>private</code> <code>defaultchannelhandlercontext getactualupstreamcontext(defaultchannelhandlercontext ctx) {</code>

<code>07</code>

<code>    </code><code>defaultchannelhandlercontext realctx = ctx;</code>

<code>08</code>

<code>    </code><code>while</code> <code>(!realctx.canhandleupstream()) {</code>

<code>09</code>

<code>        </code><code>realctx = realctx.next;</code>

<code>10</code>

<code>        </code><code>if</code> <code>(realctx ==</code><code>null</code><code>) {</code>

<code>11</code>

<code>            </code><code>return</code> <code>null</code><code>;</code>

<code>12</code>

<code>        </code><code>}</code>

<code>13</code>

<code>    </code><code>}</code>

<code>14</code>

<code>    </code><code>return</code> <code>realctx;</code>

<code>15</code>

这里最终调用了<code>channelupstreamhandler.handleupstream</code>来处理这个channelevent。有意思的是,这里我们看不到任何”将handler向后移一位”的操作,但是我们总不能每次都用同一个handler来进行处理啊?实际上,我们更为常用的是<code>channelhandlercontext.handleupstream</code>方法(实现是<code>defaultchannelhandlercontext.sendupstream</code>方法):

<code>    </code><code>defaultchannelhandlercontext next = getactualupstreamcontext(</code><code>this</code><code>.next);</code>

<code>    </code><code>defaultchannelpipeline.</code><code>this</code><code>.sendupstream(next, e);</code>

可以看到,这里最终仍然调用了<code>channelpipeline.sendupstream</code>方法,但是它会将handler指针后移。

我们接下来看看<code>defaultchannelhandlercontext.senddownstream</code>:

<code>public</code> <code>void</code> <code>senddownstream(channelevent e) {</code>

<code>    </code><code>defaultchannelhandlercontext prev = getactualdownstreamcontext(</code><code>this</code><code>.prev);</code>

<code>    </code><code>if</code> <code>(prev ==</code><code>null</code><code>) {</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>getsink().eventsunk(defaultchannelpipeline.</code><code>this</code><code>, e);</code>

<code>        </code><code>}</code><code>catch</code> <code>(throwable t) {</code>

<code>            </code><code>notifyhandlerexception(e, t);</code>

<code>    </code><code>}</code><code>else</code> <code>{</code>

<code>        </code><code>defaultchannelpipeline.</code><code>this</code><code>.senddownstream(prev, e);</code>

与sendupstream好像不大相同哦?这里有两点:一是到达末尾时,就如梦境二所说,会调用channelsink进行处理;二是这里指针是往前移的,所以我们知道了:

upstreamhandler是从前往后执行的,downstreamhandler是从后往前执行的。在channelpipeline里添加时需要注意顺序了!

defaultchannelpipeline里还有些机制,像添加/删除/替换handler,以及<code>channelpipelinefactory</code>等,比较好理解,就不细说了。

好了,深入分析完代码,有点头晕了,我们回到最开始的地方,来想一想,netty的pipeline机制解决了什么问题?

我认为至少有两点:

一是提供了channelhandler的编程模型,基于channelhandler开发业务逻辑,基本不需要关心网络通讯方面的事情,专注于编码/解码/逻辑处理就可以了。handler也是比较方便的开发模式,在很多框架中都有用到。

二是实现了所谓的”universal asynchronous api”。这也是netty官方标榜的一个功能。用过oio和nio的都知道,这两套api风格相差极大,要从一个迁移到另一个成本是很大的。即使是nio,异步和同步编程差距也很大。而netty屏蔽了oio和nio的api差异,通过channel提供对外接口,并通过channelpipeline将其连接起来,因此替换起来非常简单。

Netty源码解读(三)Channel与Pipeline

理清了channelpipeline的主流程,我们对channel部分的大致结构算是弄清楚了。可是到了这里,我们依然对一个连接具体怎么处理没有什么概念。在下篇文章,我们会分析一下,在netty中,究竟是如何处理连接的建立、数据的传输这些事情的。

继续阅读