天天看点

学习socket nio 之 mina实例(2)

IoFilter:过滤器层

        这里我们做一个解码的编码的过滤层,这也是mina中最常用的。首先我们需要定义属于我们自己的协议,也就是数据包的格式:别以为这很复杂,其实很简单的。

       我们知道数据都是字节类型的,那么我们的协议格式如下:前两位表示数据包的长度(一个short类型正好两个字节),第三位是闲置位,后面的是数据。长度是闲置位和

数据长度的和。这样我们就可以根据前两位确定,我们的数据包到那里结束。那么我们循环这么读,就会取得所有的数据包。是不是很简单啊,这个格式就是我们的协议。

     为了更简单,这里我们客户端发往服务端的数据进行编码和解码,服务端发往客户端的就不编码了,客户端也就不用解码。服务端使用mina,客户端我们就使用基本的socket nio。

编码工厂类:

public class CodecFactory extends DemuxingProtocolCodecFactory{
    public CodecFactory(){
        super.addMessageEncoder(String.class, Encoder.class);
        super.addMessageDecoder(Decoder.class);
    }
}      

解码类:

import java.util.ArrayList;
import java.util.List;
 
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
public class Decoder implements MessageDecoder {
    
    private byte[] r_curPkg = null;
    private int r_pos = -1; // 包计数器
    static private final int PKG_SIZE_BYTES = 2;//包长度
    
    public Decoder() { }
 
    @Override
    public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
        return MessageDecoderResult.OK;
    }
    
    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception {
        List<String> list = new ArrayList<String>();
        while (in.remaining() >= PKG_SIZE_BYTES || (r_pos >= 0 && in.hasRemaining())) {// 循环接收包,4为一个整型,表示包长度b, 如果上一个包未接收完成时,继续接收
            // 如果上个包已收完整,则创建新的包
            if (r_pos == -1) {
                //得到下一个包的长度,长度不包括前两位,即包的长度=压缩位长度+数据位长度
                int pkgLen = in.getShort(); 
                
                //如果包长度小于0,那么此包错误,解码失败,返回。
                if (pkgLen < 0) {
                    return MessageDecoderResult.NOT_OK;
                }
                in.get(); 
                r_curPkg = new byte[pkgLen-1]; //数组长度为数据长度
                r_pos = 0;
            }
            int need = r_curPkg.length - r_pos; //需要读取的数据长度
            int length = in.remaining();//缓冲区中可读的数据长度
            if (length >= need) {// 可以把当前包读完整
                in.get(r_curPkg, r_pos, need); // 复制缓冲区中的数据到r_curPkg中
                // 处理接收到一个完整的包数据后,把包添加到池中,判断是否需要需要解压
                byte[] data = r_curPkg;
                String str = new String(data);
                list.add(str);
                r_curPkg = null;
                r_pos = -1;
            } else {
                // 如果剩下的字节数,不够一个包则
                int remainBytes = in.remaining();
                in.get(r_curPkg, r_pos, remainBytes);
                r_pos += remainBytes;
                return MessageDecoderResult.NEED_DATA;
            }
        }
        for (String protocol : list) {
            out.write(protocol);
        }
        return MessageDecoderResult.OK;
    }
 
    @Override
    public void finishDecode(IoSession session, ProtocolDecoderOutput out) {
 
    }
    
    
}      

编码类:(没有进行编码,只进行了数据发送)

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.demux.MessageEncoder;
public class Encoder implements MessageEncoder<String>{
 
    public Encoder(){
        
    }
 
    @Override
    public void encode(IoSession session, String message, ProtocolEncoderOutput out)
            throws Exception {
            System.out.println("encode..................");
            String value = (String) message;  
            IoBuffer buf = IoBuffer.allocate(value.getBytes().length);  
            buf.setAutoExpand(true);  
            if (value != null){
                buf.put(value.trim().getBytes());  
            }  
            buf.flip();  
            out.write(buf); 
            out.flush();
    }
}      

IoService层:

import java.io.IOException;
import java.net.InetSocketAddress;
 
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 
public class MinaServer {
 
    private static final int PORT = 9123;
    
    public static void main(String [] args) throws IOException{
        
        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
                acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CodecFactory()));
        acceptor.setHandler(new ServerHandler());
        
        
        acceptor.getSessionConfig().setReadBufferSize( 3 );
                acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
        acceptor.bind( new InetSocketAddress(PORT) );
    }
}      

到这里我们的服务端代码就写完了,

客户端实现

<span style="font-size:12px">public class SocketClient {
 
    public static void main(String...args)throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",9123));
        byte [] bytes = "aaaa".getBytes();
        
        //对数据包进行编码
        ByteBuffer buffer = ByteBuffer.allocate(bytes.length+3);
        buffer.putShort((short)(bytes.length+1)); //包长度
        buffer.put((byte)1);//闲置位
        buffer.put(bytes);//数据
        buffer.flip();
        socketChannel.write(buffer);
        socketChannel.socket().shutdownOutput();
        
        String obj = receive(socketChannel);
        System.out.println(obj);
    }
    
    private static String receive(SocketChannel socketChannel)throws Exception{
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        int size = 0;
        byte [] bytes = null;
        while((size = socketChannel.read(buffer))>=0){
            buffer.flip();
            bytes = new byte[size];
            buffer.get(bytes);
            baos.write(bytes);
            buffer.clear();
        }
        bytes = baos.toByteArray();
        baos.close();
        return new String(bytes);
    }
}
</span>      

所有的代码都写完了,先启动服务端的MinaServer,然后再启动客户端,我们就会看到结果。

继续阅读