大家好,很高兴在这里跟大家分享下rocketmq源码实现,如有不对的地方欢迎指正。
接着上篇文章继续展开namesrv注册过程。
1 public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig
.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler());
}
当broker启动的时候指定namesrv地址后,由netty接收连接,分为以下几个步骤:
1.1 第一步channel注册 NettyConnetManageHandler
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}
打印出远程注册地址,调用父类注册channel
1.2 第二步 channel激活
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress
.toString(), ctx.channel()));
}
}
打印出channel激活地址,调用父类激活操作
判断NettyRemotingServer channelEventListener不为空的时候,放入Netty连接事件
主要是处理 CONNECT,CLOSE,IDLE,EXCEPTION 处理Channel被关闭,下线broker
2 public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
NettyDecoder.decode解码ByteBuf转换成RemotingCommand,服务器与客户端通过传递 RemotingCommand来交互,包含Header,Body
private int code; code码
private LanguageCode language = LanguageCode.JAVA;交互语言
private int version = 0;版本
private int opaque = RequestId.getAndIncrement();操作
private int flag = 0;标示
private String remark;
private HashMap<String, String> extFields;扩展字段
private transient CommandCustomHeader customHeader;用户header
private transient byte[] body;消息体
3 NettyServerHandler.channelRead0 netty服务处理
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) { 根据 command type判断
case REQUEST_COMMAND: 请求命令
proce***equestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND: 响应命令
proce***esponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
处理消息接受处理,分以下步骤:
3.1 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair =
null == matched ? this.defaultRequestProcessor : matched;
获取默认的请求处理
3.2 if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
这是rpc 勾作用是是在处理之前或者处理之后扩展一些操作
if (rpcHook != null) {
rpcHook
.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
final RemotingCommand response = pair.getObject1().proce***equest(ctx, cmd);
真正的处理在这里,稍后在做详细解释
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd, response);
}
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(cmd.getOpaque());
response.markResponseType();
try {
ctx.writeAndFlush(response);
响应response及刷新缓冲池
}
catch (Throwable e) {
plog.error("process request over, but response failed", e);
plog.error(cmd.toString());
plog.error(response.toString());
}
}
else {
// 收到请求,但是没有返回应答,可能是proce***equest中进行了应答,忽略这种情况
}
}
}
catch (Throwable e) {
plog.error("process request exception", e);
plog.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(
RemotingSysResponseCode.SYSTEM_ERROR,//
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(cmd.getOpaque());
ctx.writeAndFlush(response);
}
}
}
};
try {
// 这里需要做流控,要求线程池对应的队列必须是有大小限制的
pair.getObject2().submit(run);
}
catch (RejectedExecutionException e) {
// 每个线程10s打印一次
if ((System.currentTimeMillis() % 10000) == 0) {
plog.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+ ", too many requests and system thread pool busy, RejectedExecutionException " //
+ pair.getObject2().toString() //
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"too many requests and system thread pool busy, please try another server");
response.setOpaque(cmd.getOpaque());
ctx.writeAndFlush(response);
}
如果线程池已满,发生拒绝exception的时候,返回线程池忙碌
}
}
这里需要特别注意:采用线程池对请求做限流的处理
3.3 String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,
error);
response.setOpaque(cmd.getOpaque());
ctx.writeAndFlush(response);
plog.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
如果没有默认注册的各个RPC处理器则返回不支持异常
3.4 final RemotingCommand response = pair.getObject1().proce***equest(ctx, cmd);
这里分析下真正处理的细节 包含设置 kv config配置,获取kv config,删除kv config,注册broker,下线broker,根据Topic获取Broker Name、队列数,获取注册到Name Server的所有Broker集群信息
3.5 switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 新版本Broker,支持Filter Server
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
// 低版本Broker,不支持Filter Server
else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KV_CONFIG_BY_VALUE:
return getKVConfigByValue(ctx, request);
case RequestCode.DELETE_KV_CONFIG_BY_VALUE:
return deleteKVConfigByValue(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
default:
break;
}
根据请求code做响应的操作,比如注册broker信息
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 新版本Broker,支持Filter Server
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
// 低版本Broker,不支持Filter Server
else {
return this.registerBroker(ctx, request);
}
3.6 final RemotingCommand response =
RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
创建response响应RemotingCommand
final RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.readCustomHeader();
获取响应请求头
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request
.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
解码定制的头信息
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
解码请求body包含topic config
}
else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion()
.setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
}
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//
requestHeader.getClusterName(), // 1集群名称
requestHeader.getBrokerAddr(), // 2broker地址
requestHeader.getBrokerName(), // 3broker名称
requestHeader.getBrokerId(), // 4brokerID
requestHeader.getHaServerAddr(),// 5HA地址
registerBrokerBody.getTopicConfigSerializeWrapper(), // 6topic config配置
registerBrokerBody.getFilterServerList(),//获取过滤服务
ctx.channel()// 7获取broker channel
);
注册broker信息
responseHeader.setHaServerAddr(result.getHaServerAddr());
设置HA地址,broker启动后 端口+1作为HA备用地址
responseHeader.setMasterAddr(result.getMasterAddr());
设置主地址
// 获取顺序消息 topic 列表
byte[] jsonValue =
this.namesrvController.getKvConfigManager().getKVListByNamespace(
NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
转载于:https://blog.51cto.com/haqiaolong/1631306