- package niocommunicate;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.CancelledKeyException;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Arrays;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class Server {
- private Selector selector = getSelector();
- private ServerSocketChannel ss = null;
- private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(20));
- private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
- public Selector getSelector() {
- try {
- return Selector.open();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
- /**
- * 创建非阻塞服务器绑定5555端口
- */
- public Server() {
- try {
- ss = ServerSocketChannel.open();
- ss.bind(new InetSocketAddress(5555));
- ss.configureBlocking(false);
- if (selector == null) {
- selector = Selector.open();
- }
- ss.register(selector, SelectionKey.OP_ACCEPT);
- } catch (Exception e) {
- e.printStackTrace();
- close();
- }
- }
- /**
- * 关闭服务器
- */
- private void close() {
- threadPool.shutdown();
- try {
- if (ss != null) {
- ss.close();
- }
- if (selector != null) {
- selector.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- /**
- * 启动选择器监听客户端事件
- */
- private void start() {
- threadPool.execute(new Runnable() {
- @Override
- public void run() {
- try {
- while (true) {
- if (selector.select(10) == 0) {
- continue;
- }
- Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
- while (iterator.hasNext()) {
- SelectionKey selectedKey = iterator.next();
- iterator.remove();
- try {
- if (selectedKey.isReadable()) {
- if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
- selectionKeyMap.put(selectedKey.hashCode(), selectedKey);
- threadPool.execute(new ReadClientSocketHandler(selectedKey));
- }
- } else if (selectedKey.isWritable()) {
- Object responseMessage = selectedKey.attachment();
- SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
- selectedKey.interestOps(SelectionKey.OP_READ);
- if (responseMessage != null) {
- threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,
- responseMessage));
- }
- } else if (selectedKey.isAcceptable()) {
- ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
- SocketChannel clientSocket = ssc.accept();
- if (clientSocket != null) {
- clientSocket.configureBlocking(false);
- clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- }
- }
- } catch (CancelledKeyException cc) {
- selectedKey.cancel();
- selectionKeyMap.remove(selectedKey.hashCode());
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- close();
- }
- }
- });
- }
- /**
- * 响应数据给客户端线程
- * @author haoguo
- *
- */
- private class WriteClientSocketHandler implements Runnable {
- SocketChannel client;
- Object respnoseMessage;
- WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {
- this.client = client;
- this.respnoseMessage = respnoseMessage;
- }
- @Override
- public void run() {
- byte[] responseByteData = null;
- String logResponseString = "";
- if (respnoseMessage instanceof byte[]) {
- responseByteData = (byte[]) respnoseMessage;
- logResponseString = new String(responseByteData);
- } else if (respnoseMessage instanceof String) {
- logResponseString = (String) respnoseMessage;
- responseByteData = logResponseString.getBytes();
- }
- if (responseByteData == null || responseByteData.length == 0) {
- System.out.println("响应的数据为空");
- return;
- }
- try {
- client.write(ByteBuffer.wrap(responseByteData));
- System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString
- + "]");
- } catch (IOException e) {
- e.printStackTrace();
- try {
- client.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- }
- }
- /**
- * 读客户端发送数据线程
- * @author haoguo
- *
- */
- private class ReadClientSocketHandler implements Runnable {
- private SocketChannel client;
- private ByteBuffer tmp = ByteBuffer.allocate(1024);
- private SelectionKey selectionKey;
- ReadClientSocketHandler(SelectionKey selectionKey) {
- this.selectionKey = selectionKey;
- this.client = (SocketChannel) selectionKey.channel();
- }
- @Override
- public void run() {
- try {
- tmp.clear();
- byte[] data = new byte[0];
- int len = -1;
- while ((len = client.read(tmp)) > 0) {
- data = Arrays.copyOf(data, data.length + len);
- System.arraycopy(tmp.array(), 0, data, data.length - len, len);
- tmp.rewind();
- }
- if (data.length == 0) {
- return;
- }
- System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]");
- // dosomthing
- byte[] response = "response".getBytes();
- client.register(selector, SelectionKey.OP_WRITE, response);
- } catch (IOException e) {
- System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");
- try {
- SelectionKey selectionKey = client.keyFor(selector);
- selectionKey.cancel();
- client.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- } finally {
- selectionKeyMap.remove(selectionKey.hashCode());
- }
- }
- }
- public static void main(String[] args) {
- Server server = new Server();
- server.start();
- }
- }
- package niocommunicate;
- import java.io.IOException;
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.ClosedChannelException;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.Arrays;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.List;
- public class Client {
- SocketChannel client;
- Selector selctor = getSelector();
- private volatile boolean run = true;
- private List<Object> messageQueue = new LinkedList<>();
- public Selector getSelector() {
- try {
- return Selector.open();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
- public Client() {
- try {
- client = SocketChannel.open();
- client.configureBlocking(false);
- client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555));
- client.register(selctor, SelectionKey.OP_CONNECT);
- } catch (IOException e) {
- e.printStackTrace();
- }
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (run) {
- try {
- if (selctor.select(20) == 0) {
- continue;
- }
- Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator();
- while (iterator.hasNext()) {
- SelectionKey selectionKey = iterator.next();
- iterator.remove();
- if (selectionKey.isConnectable()) {
- SocketChannel sc = (SocketChannel) selectionKey.channel();
- sc.finishConnect();
- sc.register(selctor, SelectionKey.OP_READ);
- } else if (selectionKey.isWritable()) {
- selectionKey.interestOps(SelectionKey.OP_READ);
- Object requestMessage = selectionKey.attachment();
- SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel();
- byte[] requestByteData = null;
- if (requestMessage instanceof byte[]) {
- requestByteData = (byte[]) requestMessage;
- } else if (requestMessage instanceof String) {
- requestByteData = ((String) requestMessage).getBytes();
- System.out.println("client send Message:[" + requestMessage + "]");
- } else {
- System.out.println("unsupport send Message Type" + requestMessage.getClass());
- }
- System.out.println("requestMessage:" + requestMessage);
- if (requestByteData != null && requestByteData.length > 0) {
- try {
- writeSocketChannel.write(ByteBuffer.wrap(requestByteData));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- } else if (selectionKey.isReadable()) {
- SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel();
- ByteBuffer tmp = ByteBuffer.allocate(1024);
- int len = -1;
- byte[] data = new byte[0];
- if ((len = readSocketChannel.read(tmp)) > 0) {
- data = Arrays.copyOf(data, data.length + len);
- System.arraycopy(tmp.array(), 0, data, data.length - len, len);
- tmp.rewind();
- }
- if (data.length > 0) {
- System.out.println("客户端接收到数据:[" + new String(data) + "]");
- }
- }
- }
- } catch (IOException e1) {
- e1.printStackTrace();
- close();
- }
- }
- }
- }).start();
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public void close() {
- try {
- SelectionKey selectionKey = client.keyFor(selctor);
- selectionKey.cancel();
- client.close();
- run = false;
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- public void writeData(String data) {
- messageQueue.add(data);
- while (messageQueue.size() > 0) {
- Object firstSendData = messageQueue.remove(0);
- try {
- client.register(selctor, SelectionKey.OP_WRITE, firstSendData);
- } catch (ClosedChannelException e) {
- e.printStackTrace();
- }
- try {
- Thread.sleep(40);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- public static void main(String[] args) {
- Client client = new Client();
- long t1 = System.currentTimeMillis();
- for (int i = 10; i < 200; i++) {
- client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
- + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
- + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
- + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
- + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
- + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
- + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
- + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei"
- + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
- + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
- + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
- + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
- + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
- + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
- + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
- + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i);
- }
- long t2 = System.currentTimeMillis();
- System.out.println("总共耗时:" + (t2 - t1) + "ms");
- client.close();
- }
- }
- package niocommunicate;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.CancelledKeyException;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Arrays;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class Server {
- private Selector selector = getSelector();
- private ServerSocketChannel ss = null;
- private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(20));
- private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
- private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>();
- private volatile boolean run = true;
- private volatile boolean isClose = false;
- public Selector getSelector() {
- try {
- return Selector.open();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
- /**
- * 创建非阻塞服务器绑定5555端口
- */
- public Server() {
- try {
- ss = ServerSocketChannel.open();
- ss.bind(new InetSocketAddress(5555));
- ss.configureBlocking(false);
- if (selector == null) {
- selector = Selector.open();
- }
- ss.register(selector, SelectionKey.OP_ACCEPT);
- } catch (Exception e) {
- e.printStackTrace();
- close();
- }
- }
- public boolean isClose() {
- return isClose;
- }
- /**
- * 关闭服务器
- */
- private void close() {
- run = false;
- isClose = true;
- threadPool.shutdown();
- try {
- if (ss != null) {
- ss.close();
- }
- if (selector != null) {
- selector.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- /**
- * 启动选择器监听客户端事件
- */
- private void start() {
- threadPool.execute(new Runnable() {
- @Override
- public void run() {
- try {
- while (run) {
- if (selector.select(10) == 0) {
- continue;
- }
- Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
- while (iterator.hasNext()) {
- SelectionKey selectedKey = iterator.next();
- iterator.remove();
- try {
- if (selectedKey.isReadable()) {
- if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
- selectionKeyMap.put(selectedKey.hashCode(), selectedKey);
- threadPool.execute(new ReadClientSocketHandler(selectedKey));
- }
- } else if (selectedKey.isWritable()) {
- SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
- selectedKey.interestOps(SelectionKey.OP_READ);
- List<Object> list = responseMessageQueue.get(selectedKey.hashCode());
- if (list == null) {
- list = new LinkedList<Object>();
- responseMessageQueue.put(selectedKey.hashCode(), list);
- }
- while (list.size() > 0) {
- Object responseMessage = list.remove(0);
- if (responseMessage != null) {
- threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,
- responseMessage));
- }
- }
- } else if (selectedKey.isAcceptable()) {
- ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
- SocketChannel clientSocket = ssc.accept();
- if (clientSocket != null) {
- clientSocket.configureBlocking(false);
- clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- }
- }
- } catch (CancelledKeyException cc) {
- selectedKey.cancel();
- int hashCode = selectedKey.hashCode();
- selectionKeyMap.remove(hashCode);
- responseMessageQueue.remove(hashCode);
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- close();
- }
- }
- });
- }
- /**
- * 响应数据给客户端线程
- *
- * @author haoguo
- *
- */
- private class WriteClientSocketHandler implements Runnable {
- SocketChannel client;
- Object respnoseMessage;
- WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {
- this.client = client;
- this.respnoseMessage = respnoseMessage;
- }
- @Override
- public void run() {
- byte[] responseByteData = null;
- String logResponseString = "";
- if (respnoseMessage instanceof byte[]) {
- responseByteData = (byte[]) respnoseMessage;
- logResponseString = new String(responseByteData);
- } else if (respnoseMessage instanceof String) {
- logResponseString = (String) respnoseMessage;
- responseByteData = logResponseString.getBytes();
- }
- if (responseByteData == null || responseByteData.length == 0) {
- System.out.println("响应的数据为空");
- return;
- }
- try {
- client.write(ByteBuffer.wrap(responseByteData));
- System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString
- + "]");
- } catch (IOException e) {
- e.printStackTrace();
- try {
- SelectionKey selectionKey = client.keyFor(selector);
- if (selectionKey != null) {
- selectionKey.cancel();
- int hashCode = selectionKey.hashCode();
- responseMessageQueue.remove(hashCode);
- }
- if (client != null) {
- client.close();
- }
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- }
- }
- /**
- * 读客户端发送数据线程
- *
- * @author haoguo
- *
- */
- private class ReadClientSocketHandler implements Runnable {
- private SocketChannel client;
- private ByteBuffer tmp = ByteBuffer.allocate(1024);
- private SelectionKey selectionKey;
- int hashCode;
- ReadClientSocketHandler(SelectionKey selectionKey) {
- this.selectionKey = selectionKey;
- this.client = (SocketChannel) selectionKey.channel();
- this.hashCode = selectionKey.hashCode();
- }
- @Override
- public void run() {
- try {
- tmp.clear();
- byte[] data = new byte[0];
- int len = -1;
- while ((len = client.read(tmp)) > 0) {
- data = Arrays.copyOf(data, data.length + len);
- System.arraycopy(tmp.array(), 0, data, data.length - len, len);
- tmp.rewind();
- }
- if (data.length == 0) {
- return;
- }
- String readData = new String(data);
- System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0, 3) + "]");
- // dosomthing
- byte[] response = ("response" + readData.substring(0, 3)).getBytes();
- List<Object> list = responseMessageQueue.get(hashCode);
- list.add(response);
- client.register(selector, SelectionKey.OP_WRITE);
- // client.register(selector, SelectionKey.OP_WRITE, response);
- } catch (IOException e) {
- System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");
- try {
- SelectionKey selectionKey = client.keyFor(selector);
- if (selectionKey != null) {
- selectionKey.cancel();
- }
- if (client != null) {
- client.close();
- }
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- } finally {
- selectionKeyMap.remove(hashCode);
- }
- }
- }
- public static void main(String[] args) {
- Server server = new Server();
- server.start();
- }
- }
来源: http://www.phpxs.com/code/1001543/