pom.xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
编写服务端
复写public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String>
长连接连接调用函数
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
}
channels.add(ctx.channel());
}
长连接掉线调用函数
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
}
channels.remove(ctx.channel());
}
长连接在线活跃的函数
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
String uuid = ctx.channel().id().asLongText();
//map.put(uuid, (SocketChannel) ctx.channel());
new GatewayService().addGatewayChannel(uuid, (SocketChannel) incoming);
map=GatewayService.getChannels();
System.out.println("a new connect come in: " + uuid);
System.out.println("map:"+map);
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");
}
连接出现异常调用函数
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
服务端收到信息
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel incoming = channelHandlerContext.channel();
//Thread.sleep(1000000);
System.out.println(LocalTime.now()+" "+incoming.remoteAddress()+":"+s);
/*if(s.equals("hi")){
SimpleChatServerHandler.send();
}*/
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
// stringBuilder./*StringBuilder stringBuilder=new StringBuilder();
// String[] s1=s.split("\\|",3);
// stringBuilder.append(s1[0]);
// channel.writeAndFlush("[" + stringBuilder.toString() + "]" + stringBuilder.toString().substring(s1[0].length()+1,stringBuilder.toString().length()) + "\n");*/append();
} else {
channel.writeAndFlush("[you]" + s + "\n");
}
}
}
配置服务端参数:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.util.concurrent.TimeUnit;
public class SimpleChatServerInitializer extends
ChannelInitializer<SocketChannel> {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
System.out.println("write handler");
ctx.writeAndFlush(msg);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatServerHandler());
//超时机制设置
pipeline.addLast(new IdleStateHandler(20, 0, 0, TimeUnit.MINUTES));
//心跳机制
pipeline.addLast(new HeartbeatServerHandler());
System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上");
}
}
编写客户端
复写public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String>
//收到服务端发送过来的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
String message = (String) msg;
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
System.out.println(msg);
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
}
//下面是和心跳机制有关系的
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活时间是:"+new Date());
System.out.println("HeartBeatClientHandler channelActive");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止时间是:"+new Date());
System.out.println("HeartBeatClientHandler channelInactive");
}
配置netty客户端
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatClientHandler());
pipeline.addLast(new IdleStateHandler(0, 15, 0, TimeUnit.MINUTES));
pipeline.addLast(new HeartBeatClientHandler());
}
}
启动netty服务端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class SimpleChatClient {
public static void main(String[] args) throws Exception{
new SimpleChatClient("localhost", 8090).run();
}
private final String host;
private final int port;
public SimpleChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
Channel channel = bootstrap.connect(host, port).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while(true){
channel.writeAndFlush(in.readLine() + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
接下来是心跳机制
服务端:
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
private int loss_connect_time = 0;
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if(evt instanceof IdleStateEvent){
//服务端对应着读事件,当为READER_IDLE时触发
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.READER_IDLE){
loss_connect_time++;
System.out.println("接收消息超时");
if(loss_connect_time > 2){
System.out.println("关闭不活动的链接");
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
}
}
客户端
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));
private static final int TRY_TIMES = 4;
private int currentTime = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活时间是:"+new Date());
System.out.println("链接已经激活");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止时间是:"+new Date());
System.out.println("关闭链接");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("当前轮询时间:"+new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
System.out.println("currentTime:"+currentTime);
currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
github:https://github.com/dajitui/netty
新增知识:
之前有个需求是做图文直播的,我想用netty实现,一直纠结如何解决房间问题,天啊,后来发现原来这么简单
一般是web应用,前端js会调用
//webSocket的链接
socket.onopen = function(data) {
console.log("socket.onopen:")
console.log(data);
user.table = num;
user.message = '10001';
delete user.pwd;
console.log(user);
//链接成功后发送用户信息进入聊天室
socket.send(JSON.stringify(user));
}
前台把房间号,发给后端,后端绑定即可
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object o) throws Exception {
}
发现一个蛮好的项目可以让大家初步认识nettyhttps://blog.csdn.net/qq_36994771/article/details/80085876
我把他改进跑起来,带入地址:https://github.com/dajitui/nettytest