SocketServer主要用于接收外部的网络请求,并把请求添加到请求队列中。
在KafkaServer.scala中的start方法中,有这样的入口:
这块就是启动了一个SocketServer,我们具体看一下。
我们看下SocketServer里面包含的参数:
这里面涉及几个配置内容:
listeners:默认是PLAINTEXT://:port,前面部分是协议,可配置为PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL
num.network.threads:处理网络请求的线程个数配置,默认是3
queued.max.requests:请求队列的最大个数,默认500
max.connections.per.ip:单机IP的最大连接个数的配置,默认不限制
max.connections.per.ip.overrides:针对某个特别的IP的连接个数限制的重新设置值.多个IP配置间使用逗号分开,如:host1:500,host2:600
启动的代码如下:
这块涉及到几个配置项,主要用于生成socket中的SO_SNDBUF和SO_RCVBUF。
socket.send.buffer.bytes:默认值100kb,这个用于SOCKET发送数据的缓冲区大小
socket.receive.buffer.bytes:默认值100kb,这个用于SOCKET的接收数据的缓冲区大小
broker.id
我们先看下这个简单的赋值。
其实就是Processor的实例生成,主要涉及几个配置项:
socket.request.max.bytes:设置每次请求的数据大小.默认值,100MB
connections.max.idle.ms:默认为10分钟,用于设置每个连接最大的空闲回收时间
每个endPoint对应一个Acceptor,也就是每个listener对应一个Acceptor。Acceptor主要用于接收网络请求,将请求分发到processor处理。我们来看下Acceptor的run方法:
下面我们看下accept方法:
上面accept方法中,调用到了processor的accept方法,我们看下这个accept方法:
其实就是向队列中新增了一个socket通道,等待processor线程处理。下面我们看下processor是怎么处理的。
这块其实是个门面模式,里面调用的内容比较多,我们一一看一下。
这块是从队列中取一个连接,并注册到selector上。
这块主要看一下pollSelectionKeys方法:
这里开始处理socket通道中的请求,根据如下几个流程进行处理:
如果请求中包含有一个isConnectable操作,把这个连接缓存起来.
如果请求中包含有isReadable操作.表示这个client的管道中包含有数据,需要读取,接收数据.
如果包含有isWriteable的操作,表示需要向client端进行写操作.
最后检查是否有connect被关闭的请求或connect连接空闲过期
得到对应的请求的Request的实例,并把这个Request通过SocketServer中的RequestChannel的sendRequest的函数,把请求添加到请求的队列中.等待KafkaApis来进行处理.
这里的send完成表示有向client端进行响应的写操作处理完成
如果socket server中包含有已经关闭的连接,减少这个quotas中对此ip的连接数的值.
这个情况包含connect处理超时或者说有connect的消息处理错误被发起了close的请求后的处理成功的消息.