天天看点

Netty中集成Protobuf实现Java对象数据传递

场景

Netty的Socket编程详解-搭建服务端与客户端并进行数据传输:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108615023

ProtoBuf的介绍以及在Java中使用protobuf将对象进行序列化与反序列化:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108667427

在上面两篇博客基础上,对于Socket搭建服务端和客户端进行通信以及

在Java中Protobuf的使用已经有了初步的理解。

那么怎样使用Netty进行服务端和客户端的通信时使用protobuf进行数据传递。

注:

博客:

https://blog.csdn.net/badao_liumang_qizhi

关注公众号

霸道的程序猿

获取编程相关电子书、教程推送与免费下载。

实现

在src下新建protobuf目录,目录下新建Person.prpto描述文件

syntax = "proto3"; package com.badao.protobuf; option optimize_for =SPEED;option java_package = "com.badao.NettyProtobuf";option java_outer_classname = "BadaoDataInfo"; message Person {    string name = 1;    int32 age = 2;    string address = 3;}      

具体的讲解可以见上面的博客。

然后在src/main/java下新建包com.badao.NettyProtobuf

打开终端Ternimal,使用protoc进行编译代码

protoc --java_out=src/main/java src/protobuf/Person.proto      

然后就会在上面NettyProtobuf包下生成BadaoDataInfo类

上面具体的流程与讲解参照上面的博客。

然后新建服务端类NettyProtobufServer

package com.badao.NettyProtobuf; import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler; public class NettyProtobufServer {    public static void main(String[] args) throws  Exception    {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try{            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new NettyProtobufInitializer());            //绑定端口            ChannelFuture channelFuture = serverBootstrap.bind(70).sync();            channelFuture.channel().closeFuture().sync();        }finally {            //关闭事件组            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}      

其中用到了自定义的初始化器NettyProtoInitilizer

package com.badao.NettyProtobuf; import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class NettyProtobufInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();         pipeline.addLast(new ProtobufVarint32FrameDecoder());        pipeline.addLast(new ProtobufDecoder(BadaoDataInfo.Person.getDefaultInstance()));        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());        pipeline.addLast(new ProtobufEncoder());         pipeline.addLast(new NettyProtobufHandler());    }}      

注意这里不同的是添加的处理器是Netty自带的关于Protobuf的解码编码的处理器。

其中Decode处理器的参数就是上面生成的代码的默认的实例。

然后最后又添加了自定义的处理器NettyProtobufHandler

package com.badao.NettyProtobuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

public class NettyProtobufHandler extends SimpleChannelInboundHandler<BadaoDataInfo.Person> {

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, BadaoDataInfo.Person msg) throws Exception {

        System.out.println(msg.getName());

        System.out.println(msg.getAge());

        System.out.println(msg.getAddress());

    }

}

自定义处理器中在收到发送的消息时将消息进行输出,这里这里在继承SimpleChannelInboundHandler

的泛型就是上面生成的类。

然后新建客户端类

package com.badao.NettyProtobuf; import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel; public class NettyProtobufClient {    public static void main(String[] args) throws  Exception {        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();        try {             Bootstrap bootstrap = new Bootstrap();             bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)                    .handler(new NettyProtoBufClientInitializer());            //绑定端口            ChannelFuture channelFuture = bootstrap.connect("localhost", 70);            channelFuture.channel().closeFuture().sync();        } finally {            //关闭事件组            eventLoopGroup.shutdownGracefully();         }    }}      

编写其用到的初始化器

package com.badao.NettyProtobuf; import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class NettyProtoBufClientInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();         pipeline.addLast(new ProtobufVarint32FrameDecoder());        pipeline.addLast(new ProtobufDecoder(BadaoDataInfo.Person.getDefaultInstance()));        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());        pipeline.addLast(new ProtobufEncoder());         pipeline.addLast(new NettyProtobufClientHandler());    }}      

和服务端的初始化器用到的处理器一致,最后也要有自定义的处理器

package com.badao.NettyProtobuf; import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler; public class NettyProtobufClientHandler extends SimpleChannelInboundHandler<BadaoDataInfo.Person> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, BadaoDataInfo.Person msg) throws Exception {     }     @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        BadaoDataInfo.Person person = BadaoDataInfo.Person.newBuilder()                .setName("公众号:霸道的程序猿")                .setAge(100)                .setAddress("中国").build();        ctx.channel().writeAndFlush(person);    }}      

在与服务端建立连接即通道激活的回调方法中向服务端发送数据。

然后运行服务端的main方法再运行客户端的main方法。