天天看点

rocketmq namesrv 第二章接收处理过程

  大家好,很高兴在这里跟大家分享下rocketmq源码实现,如有不对的地方欢迎指正。

接着上篇文章继续展开namesrv注册过程。

rocketmq namesrv 第二章接收处理过程

1 public void initChannel(SocketChannel ch) throws Exception {

           ch.pipeline().addLast(

                  defaultEventExecutorGroup,

                  new NettyEncoder(),

                  new NettyDecoder(),

                  new IdleStateHandler(0, 0, nettyServerConfig

                                    .getServerChannelMaxIdleTimeSeconds()),

                                new NettyConnetManageHandler(),

                                new NettyServerHandler());

   }

   当broker启动的时候指定namesrv地址后,由netty接收连接,分为以下几个步骤:

   1.1 第一步channel注册 NettyConnetManageHandler

        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

            final String remoteAddress =                    RemotingHelper.parseChannelRemoteAddr(ctx.channel());

            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);

            super.channelRegistered(ctx);

        }

        打印出远程注册地址,调用父类注册channel

   1.2 第二步 channel激活

        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());

            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);

            super.channelActive(ctx);

            if (NettyRemotingServer.this.channelEventListener != null) {

                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress

                    .toString(), ctx.channel()));

            }

        }

        打印出channel激活地址,调用父类激活操作

        判断NettyRemotingServer channelEventListener不为空的时候,放入Netty连接事件

        主要是处理 CONNECT,CLOSE,IDLE,EXCEPTION 处理Channel被关闭,下线broker

2 public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        ByteBuf frame = null;

        try {

            frame = (ByteBuf) super.decode(ctx, in);

            if (null == frame) {

                return null;

            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            return RemotingCommand.decode(byteBuffer);

        } catch (Exception e) {

            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);

            RemotingUtil.closeChannel(ctx.channel());

        } finally {

            if (null != frame) {

                frame.release();

            }

        }

        return null;

      }

      NettyDecoder.decode解码ByteBuf转换成RemotingCommand,服务器与客户端通过传递            RemotingCommand来交互,包含Header,Body

      private int code; code码

      private LanguageCode language = LanguageCode.JAVA;交互语言

      private int version = 0;版本

      private int opaque = RequestId.getAndIncrement();操作

      private int flag = 0;标示

      private String remark;

      private HashMap<String, String> extFields;扩展字段

      private transient CommandCustomHeader customHeader;用户header

      private transient byte[] body;消息体

3 NettyServerHandler.channelRead0 netty服务处理

      public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

        final RemotingCommand cmd = msg;

        if (cmd != null) {

            switch (cmd.getType()) { 根据 command type判断

            case REQUEST_COMMAND: 请求命令

                proce***equestCommand(ctx, cmd);

                break;

            case RESPONSE_COMMAND: 响应命令

                proce***esponseCommand(ctx, cmd);

                break;

            default:

                break;

            }

        }

      }

      处理消息接受处理,分以下步骤:

      3.1 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());

        final Pair<NettyRequestProcessor, ExecutorService> pair =

                null == matched ? this.defaultRequestProcessor : matched;

      获取默认的请求处理

      3.2 if (pair != null) {

            Runnable run = new Runnable() {

                @Override

                public void run() {

                    try {

                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();

                        这是rpc 勾作用是是在处理之前或者处理之后扩展一些操作

                        if (rpcHook != null) {

                            rpcHook

                                .doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);

                        }

                        final RemotingCommand response = pair.getObject1().proce***equest(ctx, cmd);

                        真正的处理在这里,稍后在做详细解释

                        if (rpcHook != null) {

                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

                                cmd, response);

                        }

                        if (!cmd.isOnewayRPC()) {

                            if (response != null) {

                                response.setOpaque(cmd.getOpaque());

                                response.markResponseType();

                                try {

                                    ctx.writeAndFlush(response);

                                    响应response及刷新缓冲池

                                }

                                catch (Throwable e) {

                                    plog.error("process request over, but response failed", e);

                                    plog.error(cmd.toString());

                                    plog.error(response.toString());

                                }

                            }

                            else {

                                // 收到请求,但是没有返回应答,可能是proce***equest中进行了应答,忽略这种情况

                            }

                        }

                    }

                    catch (Throwable e) {

                        plog.error("process request exception", e);

                        plog.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {

                            final RemotingCommand response =

                                    RemotingCommand.createResponseCommand(

                                        RemotingSysResponseCode.SYSTEM_ERROR,//

                                        RemotingHelper.exceptionSimpleDesc(e));

                            response.setOpaque(cmd.getOpaque());

                            ctx.writeAndFlush(response);

                        }

                    }

                }

            };

            try {

                // 这里需要做流控,要求线程池对应的队列必须是有大小限制的

                pair.getObject2().submit(run);

            }

            catch (RejectedExecutionException e) {

                // 每个线程10s打印一次

                if ((System.currentTimeMillis() % 10000) == 0) {

                    plog.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //

                            + ", too many requests and system thread pool busy, RejectedExecutionException " //

                            + pair.getObject2().toString() //

                            + " request code: " + cmd.getCode());

                }

                if (!cmd.isOnewayRPC()) {

                    final RemotingCommand response =

                            RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,

                                "too many requests and system thread pool busy, please try another server");

                    response.setOpaque(cmd.getOpaque());

                    ctx.writeAndFlush(response);

                }

                如果线程池已满,发生拒绝exception的时候,返回线程池忙碌

            }

        }

        这里需要特别注意:采用线程池对请求做限流的处理

      3.3 String error = " request type " + cmd.getCode() + " not supported";

            final RemotingCommand response =

                  RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,

                        error);

            response.setOpaque(cmd.getOpaque());

            ctx.writeAndFlush(response);

            plog.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);

          如果没有默认注册的各个RPC处理器则返回不支持异常

     3.4 final RemotingCommand response = pair.getObject1().proce***equest(ctx, cmd);

          这里分析下真正处理的细节 包含设置 kv config配置,获取kv config,删除kv config,注册broker,下线broker,根据Topic获取Broker Name、队列数,获取注册到Name Server的所有Broker集群信息  

     3.5 switch (request.getCode()) {

        case RequestCode.PUT_KV_CONFIG:

            return this.putKVConfig(ctx, request);

        case RequestCode.GET_KV_CONFIG:

            return this.getKVConfig(ctx, request);

        case RequestCode.DELETE_KV_CONFIG:

            return this.deleteKVConfig(ctx, request);

        case RequestCode.REGISTER_BROKER:

            Version brokerVersion = MQVersion.value2Version(request.getVersion());

            // 新版本Broker,支持Filter Server

            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

                return this.registerBrokerWithFilterServer(ctx, request);

            }

            // 低版本Broker,不支持Filter Server

            else {

                return this.registerBroker(ctx, request);

            }

        case RequestCode.UNREGISTER_BROKER:

            return this.unregisterBroker(ctx, request);

        case RequestCode.GET_ROUTEINTO_BY_TOPIC:

            return this.getRouteInfoByTopic(ctx, request);

        case RequestCode.GET_BROKER_CLUSTER_INFO:

            return this.getBrokerClusterInfo(ctx, request);

        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:

            return this.wipeWritePermOfBroker(ctx, request);

        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:

            return getAllTopicListFromNameserver(ctx, request);

        case RequestCode.DELETE_TOPIC_IN_NAMESRV:

            return deleteTopicInNamesrv(ctx, request);

        case RequestCode.GET_KV_CONFIG_BY_VALUE:

            return getKVConfigByValue(ctx, request);

        case RequestCode.DELETE_KV_CONFIG_BY_VALUE:

            return deleteKVConfigByValue(ctx, request);

        case RequestCode.GET_KVLIST_BY_NAMESPACE:

            return this.getKVListByNamespace(ctx, request);

        case RequestCode.GET_TOPICS_BY_CLUSTER:

            return this.getTopicsByCluster(ctx, request);

        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:

            return this.getSystemTopicListFromNs(ctx, request);

        case RequestCode.GET_UNIT_TOPIC_LIST:

            return this.getUnitTopicList(ctx, request);

        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:

            return this.getHasUnitSubTopicList(ctx, request);

        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:

            return this.getHasUnitSubUnUnitTopicList(ctx, request);

        default:

            break;

        }

       根据请求code做响应的操作,比如注册broker信息

       case RequestCode.REGISTER_BROKER:

            Version brokerVersion = MQVersion.value2Version(request.getVersion());

            // 新版本Broker,支持Filter Server

            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

                return this.registerBrokerWithFilterServer(ctx, request);

            }

            // 低版本Broker,不支持Filter Server

            else {

                return this.registerBroker(ctx, request);

            }

    3.6 final RemotingCommand response =

                RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);

        创建response响应RemotingCommand

        final RegisterBrokerResponseHeader responseHeader =

                (RegisterBrokerResponseHeader) response.readCustomHeader();

        获取响应请求头

        final RegisterBrokerRequestHeader requestHeader =

                (RegisterBrokerRequestHeader) request

                    .decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

        解码定制的头信息

        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

        if (request.getBody() != null) {

            registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);

            解码请求body包含topic config

        }

        else {

            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion()

                .setCounter(new AtomicLong(0));

            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);

        }

        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//

            requestHeader.getClusterName(), // 1集群名称

            requestHeader.getBrokerAddr(), // 2broker地址

            requestHeader.getBrokerName(), // 3broker名称

            requestHeader.getBrokerId(), // 4brokerID

            requestHeader.getHaServerAddr(),// 5HA地址

            registerBrokerBody.getTopicConfigSerializeWrapper(), // 6topic config配置

            registerBrokerBody.getFilterServerList(),//获取过滤服务

            ctx.channel()// 7获取broker channel

            );

        注册broker信息

        responseHeader.setHaServerAddr(result.getHaServerAddr());

        设置HA地址,broker启动后 端口+1作为HA备用地址

        responseHeader.setMasterAddr(result.getMasterAddr());

        设置主地址

        // 获取顺序消息 topic 列表

        byte[] jsonValue =

                this.namesrvController.getKvConfigManager().getKVListByNamespace(

                    NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);

        response.setBody(jsonValue);

        response.setCode(ResponseCode.SUCCESS);

        response.setRemark(null);

        return response;

转载于:https://blog.51cto.com/haqiaolong/1631306