package com.clj;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO用戶端線程類
*
* @author chenlujun
* @version [版本号, 2014年12月18日]
* @see [相關類/方法]
* @since [産品/子產品版本]
*/
public class NIOSocketClient extends Thread {
private String IP="192.168.1.85";//伺服器IP
private int PORT=8989;//伺服器端口
private SocketChannel socketChannel;
private Selector selector;
/** 不帶參數構造函數
*/
NIOSocketClient()
{
try {
initClient();
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
/** 帶參數構造函數
* @param url 伺服器IP
* @param port 用戶端端口
*/
NIOSocketClient(String serverIp,int port)
{
this.IP=serverIp;
this.PORT=port;
try {
initClient();
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
while (true) {
try {
// 寫消息到伺服器端
writeMessage();
int select = selector.select();
if (select > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey sk = iter.next();
if (sk.isReadable()) {
readMessage(sk);
}
iter.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//從服務端接收消息
public void readMessage(SelectionKey sk) throws IOException,
UnsupportedEncodingException {
SocketChannel curSc = (SocketChannel) sk.channel();
ByteBuffer buffer = ByteBuffer.allocate(8);
while (curSc.read(buffer) > 0) {
buffer.flip();
System.out.println("Receive from server:"
+ new String(buffer.array(), "UTF-8"));
buffer.clear();
}
}
//向服務端發送消息
public void writeMessage() throws IOException {
while(true)
{
//每隔1秒向服務端發送一次資料
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
String ss = "Server,how are you?";
ByteBuffer buffer = ByteBuffer.wrap(ss.getBytes("UTF-8"));
while (buffer.hasRemaining()) {
System.out.println("buffer.hasRemaining() is true.");
socketChannel.write(buffer);
}
} catch (IOException e) {
if (socketChannel.isOpen()) {
socketChannel.close();
}
e.printStackTrace();
}
}
}
//初始化用戶端
public void initClient() throws IOException, ClosedChannelException {
InetSocketAddress addr = new InetSocketAddress(IP,PORT);
socketChannel = SocketChannel.open();
selector = Selector.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
// 連接配接到server
socketChannel.connect(addr);
while (!socketChannel.finishConnect()) {
System.out.println("check finish connection");
}
}
/**
* 停止用戶端
*/
public void stopServer() {
try {
if (selector != null && selector.isOpen()) {
selector.close();
}
if (socketChannel != null && socketChannel.isOpen()) {
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.clj;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
//線程數
int worker_num = 1000;
//伺服器IP
String serverIp="192.168.1.85";
//伺服器端口
int port=8989;
//線程池
ExecutorService threadPool = Executors.newFixedThreadPool(worker_num);
for (int n = 0; n < worker_num; n++) {
NIOSocketClient client=new NIOSocketClient(serverIp,port);
threadPool.execute(client);
}
}
}