天天看点

初学netty

pom.xml

<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>5.0.0.Alpha2</version>
		</dependency>
           

编写服务端

复写public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String>

长连接连接调用函数

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
    Channel incoming = ctx.channel();
    for (Channel channel : channels) {
        channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
    }
    channels.add(ctx.channel());
}
           

长连接掉线调用函数

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
    Channel incoming = ctx.channel();
    for (Channel channel : channels) {
        channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
    }
    channels.remove(ctx.channel());
}
           

长连接在线活跃的函数

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
    Channel incoming = ctx.channel();
    String uuid = ctx.channel().id().asLongText();
    //map.put(uuid, (SocketChannel) ctx.channel());
    new GatewayService().addGatewayChannel(uuid, (SocketChannel) incoming);
    map=GatewayService.getChannels();
    System.out.println("a new connect come in: " + uuid);
    System.out.println("map:"+map);
    System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");
}
           

连接出现异常调用函数

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
    Channel incoming = ctx.channel();
    System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");
    // 当出现异常就关闭连接
    cause.printStackTrace();
    ctx.close();
}
           

服务端收到信息

@Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel incoming = channelHandlerContext.channel();
        //Thread.sleep(1000000);
        System.out.println(LocalTime.now()+" "+incoming.remoteAddress()+":"+s);
        /*if(s.equals("hi")){
            SimpleChatServerHandler.send();
        }*/
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");

//                stringBuilder./*StringBuilder stringBuilder=new StringBuilder();
//                String[] s1=s.split("\\|",3);
//                stringBuilder.append(s1[0]);
//                channel.writeAndFlush("[" + stringBuilder.toString() + "]" + stringBuilder.toString().substring(s1[0].length()+1,stringBuilder.toString().length()) + "\n");*/append();
            } else {
                channel.writeAndFlush("[you]" + s + "\n");
            }
        }
    }
           

配置服务端参数:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

import java.util.concurrent.TimeUnit;

public class SimpleChatServerInitializer extends
        ChannelInitializer<SocketChannel> {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
        System.out.println("write handler");
        ctx.writeAndFlush(msg);
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatServerHandler());
        //超时机制设置
        pipeline.addLast(new IdleStateHandler(20, 0, 0, TimeUnit.MINUTES));
        //心跳机制
        pipeline.addLast(new HeartbeatServerHandler());
        System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上");
    }
}
           

编写客户端

复写public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String>

//收到服务端发送过来的消息
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        String message = (String) msg;
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);

        System.out.println(msg);
    }

    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {

    }
//下面是和心跳机制有关系的

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("激活时间是:"+new Date());
        System.out.println("HeartBeatClientHandler channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止时间是:"+new Date());
        System.out.println("HeartBeatClientHandler channelInactive");
    }
           

配置netty客户端

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatClientHandler());
        pipeline.addLast(new IdleStateHandler(0, 15, 0, TimeUnit.MINUTES));
        pipeline.addLast(new HeartBeatClientHandler());
    }
}

启动netty服务端
           
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class SimpleChatClient {
    public static void main(String[] args) throws Exception{
        new SimpleChatClient("localhost", 8090).run();
    }

    private final String host;
    private final int port;

    public SimpleChatClient(String host, int port){
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap  = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleChatClientInitializer());
            Channel channel = bootstrap.connect(host, port).sync().channel();
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            while(true){
                channel.writeAndFlush(in.readLine() + "\r\n");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }
}
           

接下来是心跳机制

服务端:

public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {


    private int loss_connect_time = 0;

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
            .unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
                    CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
            throws Exception {
        if(evt instanceof IdleStateEvent){
            //服务端对应着读事件,当为READER_IDLE时触发
            IdleStateEvent event = (IdleStateEvent)evt;
            if(event.state() == IdleState.READER_IDLE){
                loss_connect_time++;
                System.out.println("接收消息超时");
                if(loss_connect_time > 2){
                    System.out.println("关闭不活动的链接");
                    ctx.channel().close();
                }
            }else{
                super.userEventTriggered(ctx,evt);
            }
        }
    }
}
           

客户端

public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    private static final int TRY_TIMES = 4;

    private int currentTime = 0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("激活时间是:"+new Date());
        System.out.println("链接已经激活");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止时间是:"+new Date());
        System.out.println("关闭链接");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("当前轮询时间:"+new Date());
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                if(currentTime <= TRY_TIMES){
                    System.out.println("currentTime:"+currentTime);
                    currentTime++;
                    ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
                }
            }
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
           

github:https://github.com/dajitui/netty

新增知识:

之前有个需求是做图文直播的,我想用netty实现,一直纠结如何解决房间问题,天啊,后来发现原来这么简单

一般是web应用,前端js会调用

//webSocket的链接
            socket.onopen = function(data) {
                console.log("socket.onopen:")
                console.log(data);
                user.table = num;
                user.message = '10001';
                delete user.pwd;
                console.log(user);
                //链接成功后发送用户信息进入聊天室
                socket.send(JSON.stringify(user));
            }
           

前台把房间号,发给后端,后端绑定即可

@Override
    protected void messageReceived(ChannelHandlerContext ctx, Object o) throws Exception {
}
           

发现一个蛮好的项目可以让大家初步认识nettyhttps://blog.csdn.net/qq_36994771/article/details/80085876

我把他改进跑起来,带入地址:https://github.com/dajitui/nettytest