天天看点

【Kafka源码】SocketServer启动过程一、入口二、构造方法三、启动SocketServer

SocketServer主要用于接收外部的网络请求,并把请求添加到请求队列中。

在KafkaServer.scala中的start方法中,有这样的入口:

这块就是启动了一个SocketServer,我们具体看一下。

我们看下SocketServer里面包含的参数:

这里面涉及几个配置内容:

listeners:默认是PLAINTEXT://:port,前面部分是协议,可配置为PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL

num.network.threads:处理网络请求的线程个数配置,默认是3

queued.max.requests:请求队列的最大个数,默认500

max.connections.per.ip:单机IP的最大连接个数的配置,默认不限制

max.connections.per.ip.overrides:针对某个特别的IP的连接个数限制的重新设置值.多个IP配置间使用逗号分开,如:host1:500,host2:600

启动的代码如下:

这块涉及到几个配置项,主要用于生成socket中的SO_SNDBUF和SO_RCVBUF。

socket.send.buffer.bytes:默认值100kb,这个用于SOCKET发送数据的缓冲区大小

socket.receive.buffer.bytes:默认值100kb,这个用于SOCKET的接收数据的缓冲区大小

broker.id

我们先看下这个简单的赋值。

其实就是Processor的实例生成,主要涉及几个配置项:

socket.request.max.bytes:设置每次请求的数据大小.默认值,100MB

connections.max.idle.ms:默认为10分钟,用于设置每个连接最大的空闲回收时间

每个endPoint对应一个Acceptor,也就是每个listener对应一个Acceptor。Acceptor主要用于接收网络请求,将请求分发到processor处理。我们来看下Acceptor的run方法:

下面我们看下accept方法:

上面accept方法中,调用到了processor的accept方法,我们看下这个accept方法:

其实就是向队列中新增了一个socket通道,等待processor线程处理。下面我们看下processor是怎么处理的。

这块其实是个门面模式,里面调用的内容比较多,我们一一看一下。

这块是从队列中取一个连接,并注册到selector上。

这块主要看一下pollSelectionKeys方法:

这里开始处理socket通道中的请求,根据如下几个流程进行处理:

如果请求中包含有一个isConnectable操作,把这个连接缓存起来.

如果请求中包含有isReadable操作.表示这个client的管道中包含有数据,需要读取,接收数据.

如果包含有isWriteable的操作,表示需要向client端进行写操作.

最后检查是否有connect被关闭的请求或connect连接空闲过期

得到对应的请求的Request的实例,并把这个Request通过SocketServer中的RequestChannel的sendRequest的函数,把请求添加到请求的队列中.等待KafkaApis来进行处理.

这里的send完成表示有向client端进行响应的写操作处理完成

如果socket server中包含有已经关闭的连接,减少这个quotas中对此ip的连接数的值.

这个情况包含connect处理超时或者说有connect的消息处理错误被发起了close的请求后的处理成功的消息.

继续阅读