基于Java NIO实现服务端与客户端的非阻塞通信

Java从JDK 1.4开始支持NIO(New IO),与传统IO相比,NIO是非阻塞的,它包含三个核心组件:

  1. Channel,类似于Stream,但不同于Stream单向,Channel是双向的,从同一个Channel既可以读取数据,也可以写入数据;
  2. Buffer,向Channel写入数据或从Channel读取数据时,数据需先写入Buffer或读入Buffer;
  3. Selector,多个Channel可以注册到一个Selector中,Selector可以通过单线程侦听这些Channel,当有读、写、连接事件时(而不是阻塞等待读、写或连接),执行相应的操作。

基于Java NIO实现服务端与客户端的非阻塞通信如图所示:
java nio

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.magicwt;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

public class Server {

private static AtomicInteger count = new AtomicInteger(1);

public static void main(String[] args) throws Exception {
// 新建ServerSocketChannel实例,设置为非阻塞,绑定端口8081;
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8081));
// 新建Selector实例,并将ServerSocketChannel实例注册到Selector实例中,侦听ACCEPT事件
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 从Selector实例中获取事件
selector.select();
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iterator.next();
iterator.remove();
if (selectionKey.isAcceptable()) {
// 对于ACCEPT事件,新建SocketChannel实例,并将SocketChannel实例注册到Selector实例中,侦听READ事件,通过SocketChannel实例向客户端返回消息
serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("连接请求已收到,服务端建立连接");
socketChannel.write(ByteBuffer.wrap(new String("服务端消息" + count.getAndIncrement()).getBytes()));
} else if (selectionKey.isReadable()) {
// 对于READ事件,通过SocketChannel实例读取客户端消息并向客户端返回消息
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
socketChannel.read(byteBuffer);
System.out.println("已读取信息:" + new String(byteBuffer.array()));
socketChannel.write(ByteBuffer.wrap(new String("服务端消息" + count.getAndIncrement()).getBytes()));
}
}
}
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.magicwt;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

public class Client {

private static AtomicInteger count = new AtomicInteger(1);

public static void main(String[] args) throws Exception {
// 新建SocketChannel实例,设置为非阻塞,连接127.0.0.1:8080;
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8081));
// 新建Selector实例,并将SocketChannel实例注册到Selector实例中,侦听CONNECT事件
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
// 从Selector实例中获取事件
selector.select();
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iterator.next();
iterator.remove();
if (selectionKey.isConnectable()) {
// 对于CONNECT事件,新建SocketChannel实例,并将SocketChannel实例注册到Selector实例中,侦听READ事件,通过SocketChannel实例向服务端返回消息
socketChannel = (SocketChannel) selectionKey.channel();
if (socketChannel.isConnectionPending()) {
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端建立连接");
socketChannel.write(ByteBuffer.wrap(new String("客户端消息" + count.getAndIncrement()).getBytes()));
} else if (selectionKey.isReadable()) {
// 对于READ事件,通过SocketChannel实例读取服务端消息并向服务端返回消息
socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
socketChannel.read(byteBuffer);
System.out.println("已读取信息:" + new String(byteBuffer.array()));
socketChannel.write(ByteBuffer.wrap(new String("客户端信息" + count.getAndIncrement()).getBytes()));
}
}
}
}
}

分别启动服务端和客户端程序,输出如下所示。
服务端:
1
客户端:
2