天天看點

NIO用戶端(多線程)

package com.clj;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * NIO用戶端線程類
 * 
 * @author chenlujun
 * @version [版本号, 2014年12月18日]
 * @see [相關類/方法]
 * @since [産品/子產品版本]
 */
public class NIOSocketClient extends Thread {
	
	private String IP="192.168.1.85";//伺服器IP
	private int PORT=8989;//伺服器端口
	private SocketChannel socketChannel;
	private Selector selector;
	
	/** 不帶參數構造函數
	 */
	NIOSocketClient()
	{
		try {
			initClient();
		} catch (ClosedChannelException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/** 帶參數構造函數
	 * @param url 伺服器IP
	 * @param port 用戶端端口
	 */
	NIOSocketClient(String serverIp,int port)
	{
		this.IP=serverIp;
		this.PORT=port;
		try {
			initClient();
		} catch (ClosedChannelException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void run() {
		while (true) {
			try {
				// 寫消息到伺服器端
				writeMessage();

				int select = selector.select();
				if (select > 0) {
					Set<SelectionKey> keys = selector.selectedKeys();
					Iterator<SelectionKey> iter = keys.iterator();
					while (iter.hasNext()) {
						SelectionKey sk = iter.next();
						if (sk.isReadable()) {
							readMessage(sk);
						}
						iter.remove();
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	//從服務端接收消息
	public void readMessage(SelectionKey sk) throws IOException,
			UnsupportedEncodingException {
		SocketChannel curSc = (SocketChannel) sk.channel();
		ByteBuffer buffer = ByteBuffer.allocate(8);
		while (curSc.read(buffer) > 0) {
			buffer.flip();
			System.out.println("Receive from server:"
					+ new String(buffer.array(), "UTF-8"));
			buffer.clear();
		}
	}

	//向服務端發送消息
	public void writeMessage() throws IOException {
		while(true)
		{
			//每隔1秒向服務端發送一次資料
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			
			try {
				String ss = "Server,how are you?";
				ByteBuffer buffer = ByteBuffer.wrap(ss.getBytes("UTF-8"));
				while (buffer.hasRemaining()) {
					System.out.println("buffer.hasRemaining() is true.");
					socketChannel.write(buffer);
				}
			} catch (IOException e) {
				if (socketChannel.isOpen()) {
					socketChannel.close();
				}
				e.printStackTrace();
			}
		}
	}

	//初始化用戶端
	public void initClient() throws IOException, ClosedChannelException {
		InetSocketAddress addr = new InetSocketAddress(IP,PORT);
		socketChannel = SocketChannel.open();

		selector = Selector.open();
		socketChannel.configureBlocking(false);
		socketChannel.register(selector, SelectionKey.OP_READ);

		// 連接配接到server
		socketChannel.connect(addr);

		while (!socketChannel.finishConnect()) {
			System.out.println("check finish connection");
		}
	}

	/**
	 * 停止用戶端
	 */
	public void stopServer() {
		try {
			if (selector != null && selector.isOpen()) {
				selector.close();
			}
			if (socketChannel != null && socketChannel.isOpen()) {
				socketChannel.close();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}
           
package com.clj;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
	public static void main(String[] args) {
		//線程數
		int worker_num = 1000;
		//伺服器IP
		String serverIp="192.168.1.85";
		//伺服器端口
		int port=8989;
		
		//線程池
		ExecutorService threadPool = Executors.newFixedThreadPool(worker_num);
		for (int n = 0; n < worker_num; n++) {
			NIOSocketClient client=new NIOSocketClient(serverIp,port);
			threadPool.execute(client);
		}
	}
}