天天看点

Netty(十四) Netty实现HTTP与WebSocket代理服务器

目前想实现一个HTTP代理服务,能够支持HTTP和Websocket的代理。最开始的实现思路是使用Jetty服务+Http-proxy框架实现的,后来发现支持Websocket比较困难。换了另一种思路,使用Netty来实现。

github源码:

https://github.com/zhuquanwen/netty-http-websocket-proxy

1、引入依赖

dependencies {
    compile project(":common-redis")
    compile project(":common")
    compile project(":templet")
//    compile 'org.eclipse.jetty:jetty-server:9.4.11.v20180605'
//    compile 'org.eclipse.jetty:jetty-servlet:9.4.11.v20180605'
//    compile 'org.mitre.dsmiley.httpproxy:smiley-http-proxy-servlet:1.10'
    // https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient
    compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.3'

    compile 'ch.qos.logback:logback-classic:1.2.3'
    compile 'org.slf4j:slf4j-api:1.7.25'
    compile "com.squareup.okhttp3:okhttp:3.10.0"
    // https://mvnrepository.com/artifact/quartz/quartz
    compile group: 'quartz', name: 'quartz', version: '1.5.2'

    // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version:'2.9.8'
    // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version:'2.9.8'

    compile group: 'org.jsoup', name: 'jsoup', version: "1.11.3"

    // https://mvnrepository.com/artifact/com.yahoo.platform.yui/yuicompressor
    compile group: 'com.yahoo.platform.yui', name: 'yuicompressor', version: '2.4.8'

    compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
    compile 'org.apache.rocketmq:rocketmq-common:4.4.0'
    compile 'org.apache.rocketmq:rocketmq-remoting:4.4.0'
    compile 'org.apache.rocketmq:rocketmq-logging:4.4.0'
    compile group: 'org.apache.rocketmq', name: 'rocketmq-tools', version: '4.4.0'

    compile ("org.fusesource:sigar:1.6.4")
    compile group: 'io.netty', name: 'netty-all', version:'4.1.33.Final'

    // https://mvnrepository.com/artifact/org.java-websocket/Java-WebSocket
    compile group: 'org.java-websocket', name: 'Java-WebSocket', version: '1.4.0'


    compile fileTree(dir: 'lib', include: '*.jar')
}
           

重点是netty和websocket客户端的依赖,其他的可以无视,项目需要

2、Netty服务端注册

@Slf4j
@Component
public class NettyServer {
    @Autowired
    private SocketHandler socketHandler;

    public void start() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try{
            bootstrap.group(boss, worker);
            bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
            bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {

                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new HttpServerCodec());// Http消息编码解码
                    pipeline.addLast(new HttpObjectAggregator(1024*1024*1024));
                    pipeline.addLast(new ChunkedWriteHandler());
                    pipeline.addLast(socketHandler);//自定义处理类

                }
            });
            ChannelFuture future = bootstrap.bind(8180);
            log.debug("Netty服务已启动");
            future.channel().closeFuture().sync();

        }catch (Exception e){
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}
           

重点注意的是pipeline中引入的几个处理器

pipeline.addLast(new HttpServerCodec()); //引入HTTP编解码器
           
pipeline.addLast(new HttpObjectAggregator(1024*1024*1024));//引入请求内容处理器并限定请求内容大小
           
pipeline.addLast(new ChunkedWriteHandler()); //引入大数据流处理器,防止内存溢出
           
pipeline.addLast(socketHandler);//自定义处理类,也是重点的代理逻辑处理
           

3、socket处理器

@Slf4j
@Component
@ChannelHandler.Sharable
public class SocketHandler extends BaseHttpHandler {
    @Autowired
    private WsHandler wsHandler;
    @Autowired
    private HttpHandler httpHandler;
//    private WebSocketServerHandshaker handshaker;
//    private final String wsUri = "/demo/websocket";
    /*
     * channelAction
     *
     * channel 通道 action 活跃的
     *
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     *
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.debug(ctx.channel().localAddress().toString() + " 通道已激活!");
    }
    /*
     * channelInactive
     *
     * channel 通道 Inactive 不活跃的
     *
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     *
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!");
        // 关闭流
    }
    private String getMessage(ByteBuf buf) {
        byte[] con = new byte[buf.readableBytes()];
        buf.readBytes(con);
        try {
            return new String(con, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 功能:读取服务器发送过来的信息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作
            log.debug("进入http处理");
            httpHandler.handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作
            log.debug("进入websocket处理");
            wsHandler.handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }



    // 握手请求不成功时返回的应答
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // 返回应答给客户端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
    }

    /**
     * 功能:服务端发生异常的操作
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("出现异常", cause);
        //TODO
        ctx.close();


//        System.out.println("异常信息:\r\n" + cause.getMessage());
    }
}
           

重点是read处的判断,如果是Http请求进入HTTP处理,如果是websocket请求 ,进入Websocket处理

if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作
            log.debug("进入http处理");
            httpHandler.handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作
            log.debug("进入websocket处理");
            wsHandler.handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
           

4、Http处理器

@Component
@Slf4j
public class HttpHandler {

    @Autowired
    private WsHandler wsHandler;
    @Autowired
    private CacheKeyHandler cacheKeyHandler;
    @Autowired
    private RequestHandler requestHandler;
    @Autowired
    private PlaybackHandler playbackHandler;
    @Autowired
    private ResponseHandler responseHandler;
    @Autowired
    private RemoteDataHandler remoteDataHandler;

    // 处理HTTP的代码
    public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws UnsupportedEncodingException {
        log.debug("handleHttpRequest method==========" + req.method());
        log.debug("handleHttpRequest uri==========" + req.uri());
        // 如果HTTP解码失败,返回HHTP异常
        Map<String, String> parmMap = new HashMap<>();
        if (req instanceof HttpRequest) {
            //处理本身服务的URL
            ProxySetting servletSetting = HttpUtils.getRouteSetting(req);
            if (servletSetting == null) {
                boolean handle = UnproxyHandler.getInstance().handle(req, ctx);
                if (handle) {
                    //如果此服务能够处理,直接return
                    return;
                } else {
                    //TODO 如果不能处理,如果CND代理是个重定向的话,再重定向回去,从refer取
                    return;
                }
            }
            HttpMethod method = req.method();
            log.debug("进入代理http连接");
            // 如果是websocket请求就握手升级
            if (!wsHandler.wsUpgradle(req, ctx)) {
                try {
                    log.debug("请求URL:{}", req.uri());
                    byte[] keyBytes = null;
                    try {
                        log.debug("获取请求生成的缓存key");
                        keyBytes = cacheKeyHandler.getKeyBytes(req);

                    } catch (Exception e) {
                        log.error("获取缓存Key出错", e);
                        throw new RuntimeException("获取缓存Key出错", e);
                    }
                    boolean flag = requestHandler.requestHandle(req, keyBytes, ctx);
                    if (!flag) {
                        boolean resultFlag = remoteDataHandler.response(req, keyBytes, ctx);
                        if (!resultFlag) {
                            throw new RuntimeException("访问远程服务器出错");
                        }

                    }
                } catch (Exception e) {
                    String exceptionInfo = ExceptionUtils.getExceptionInfo(e);
                    HttpUtils.createHttpResponse(req, HttpStatusEnum.CACHE_SERVER_ERROR, exceptionInfo, ctx);
                } finally {
                    if (playbackHandler.backRequestLocal != null) {
                        playbackHandler.backRequestLocal.remove();
                    }
                }

            }

        }
    }
}
           

判断此HTTP服务是否是websocket请求协议升级,如果是协议升级,进入协议升级处理

!wsHandler.wsUpgradle(req, ctx)
           

如果是正常HTTP请求,进行处理,源码中很多冗余代码,因为我们在代理时还做了缓存处理,其实代理过程也很简单,就是用OkHttp向远程服务发请求,然后组装Response返回给请求者。

5、Websocket处理器

@Component
@Slf4j
public class WsHandler {
    public WsHandler() {

    }

    /**
     * 处理http请求为websocket握手时升级为websocket
     * */
    public boolean wsUpgradle(FullHttpRequest req, ChannelHandlerContext ctx) {
        boolean flag = false;
        HttpHeaders headers = req.headers();
        if (headers == null) {
            return flag;
        }
        boolean wsFlag = false;
        String connection = headers.get("Connection");
        String upgrade = headers.get("Upgrade");
        if (Objects.equals("Upgrade", connection) &&
                Objects.equals("websocket", upgrade)) {
            wsFlag = true;
        }
        if (wsFlag) {
            log.debug("websocket 请求接入");
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                    req.uri(), null, false);
            WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
            if (handshaker == null) {
                flag = false;
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                //与远程的websocket建立连接
                boolean b = connectToRemoteWs(req, ctx);
                if (b) {
                    //本机websocket建联
                    handshaker.handshake(ctx.channel(), req);
                    WsConstant.wsHandshakerMap.put(req.uri(), handshaker);
                    WsConstant.wsCtx.put(req.uri(), ctx);
                    WsConstant.ctxWs.put(ctx, handshaker);
                    flag = true;
                } else {
                    //TODO 暂时先返回这样的错误提示
                    WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
                }

            }
        }
        return flag;
    }

    // 处理Websocket的代码
    public void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否是关闭链路的指令
        log.debug("接收到websocket信息");
        if (frame instanceof CloseWebSocketFrame) {
            //先关闭远程websocket的连接
            MyWebsocketClient myWebsocketClient = WsConstant.wsCtxClient.get(ctx);
            if (myWebsocketClient != null) {
                myWebsocketClient.close();
            }
            WsConstant.ctxWs.get(ctx).close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            MyWebsocketClient myWebsocketClient = WsConstant.wsCtxClient.get(ctx);
            if (myWebsocketClient != null) {
                myWebsocketClient.sendPing();
            } else {
                WsConstant.ctxWs.get(ctx).close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            }
            return;
        }
        // 文本消息,不支持二进制消息
        if (frame instanceof TextWebSocketFrame) {
            // 返回应答消息
            String request = ((TextWebSocketFrame) frame).text();
            MyWebsocketClient myWebsocketClient = WsConstant.wsCtxClient.get(ctx);
            if (myWebsocketClient != null) {
                myWebsocketClient.send(request);
            } else {
                WsConstant.ctxWs.get(ctx).close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            }
//            ctx.channel().writeAndFlush(new TextWebSocketFrame(
//                    request + " , 欢迎使用Netty WebSocket服务,现在时刻:" + new java.util.Date().toString()));
            return;
        }

    }

    /**
     * 与远程websocket建立连接
     * */

    public boolean connectToRemoteWs(FullHttpRequest req, ChannelHandlerContext ctx) {
        boolean flag = false;
        ProxySetting servletSetting = HttpUtils.getRouteSetting(req);
        String targetUrl = servletSetting.getTargetUrl();
        //远程websocket的地址
        URI targetUriObj = null;
        try {
            targetUriObj = new URI(targetUrl);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        HttpHost httpHost = URIUtils.extractHost(targetUriObj);
        String hostStr = httpHost.toString();
        String websocketStr = hostStr.concat(req.uri());

        try {
            MyWebsocketClient myWebsocketClient = new MyWebsocketClient(websocketStr, ctx);
            myWebsocketClient.connect();
            for (int i = 0; i < 10 ; i++) {
                if (myWebsocketClient.getReadyState().equals(ReadyState.OPEN)) {
                    flag = true;
                    WsConstant.wsClientCtx.put(myWebsocketClient, ctx);
                    WsConstant.wsCtxClient.put(ctx, myWebsocketClient);
                    break;
                }
                TimeUnit.MILLISECONDS.sleep(200);
            }
            if (!flag) {
                myWebsocketClient.close();
            }
        } catch (URISyntaxException | InterruptedException e) {
            e.printStackTrace();
        }

        return flag;
    }

}
           

websocket处理与http处理有些类似,http使用okhttp作为http访问客户端,这里使用websocketclient向远程服务建立websocket实现代理。