I/O的四种模型

  • 同步阻塞(Synchronous blocking I/O)

  • 同步非阻塞(Synchronous non-blocking I/0)

  • 异步阻塞(Asynchronous blocking I/0)

  • 异步非阻塞(Asynchronous non-blocking I/0)

I/O多路复用

I/O多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。

一般情况下,I/O 复用机制需要事件分发器。 事件分发器的作用,将那些读写事件源分发给各读写事件的处理者。

涉及到事件分发器的两种模式称为:Reactor和Proactor。 Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。本文主要介绍的就是 Reactor模式相关的知识。

Reactor模式

Reactor模式也叫反应器模式

IO的发展历史

单线程阻塞

1
2
3
4
while(true){
socket = accept();
handle(socket)
}

这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。

多线程阻塞

之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

class BasicModel implements Runnable {
public void run() {
try {
ServerSocket ss =
new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
//创建新线程来handle
// or, single-threaded, or a thread pool
} catch (IOException ex) { /* ... */ }
}

static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[SystemConfig.INPUT_SIZE];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] input) {
byte[] output=null;
/* ... */
return output;
}
}
}

对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

tomcat服务器的早期版本确实是这样实现的。

优点

定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。

缺点

缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。

改进

采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。

单线程NIO模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

class Server {
public static final int SOCKET_SERVER_PORT = 8088;

public static void testServer() throws IOException {
// 1、获取Selector选择器
Selector selector = Selector.open();
// 2、获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3.设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4、绑定连接
serverSocketChannel.bind(new InetSocketAddress(SOCKET_SERVER_PORT));
// 5、将通道注册到选择器上,并注册的操作为:“接收”操作
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6、采用轮询的方式,查询获取“准备就绪”的注册过的操作
while (selector.select() > 0) {
// 7、获取当前选择器中所有注册的选择键(“已经准备就绪的操作”)
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
// 8、获取“准备就绪”的时间
SelectionKey selectedKey = selectedKeys.next();
// 9、判断key是具体的什么事件
if (selectedKey.isAcceptable()) {
// 10、若接受的事件是“接收就绪” 操作,就获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
// 11、切换为非阻塞模式
socketChannel.configureBlocking(false);
// 12、将该通道注册到selector选择器上
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectedKey.isReadable()) {
// 13、获取该选择器上的“读就绪”状态的通道
SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
// 14、读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) != -1) {
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
socketChannel.close();
}
// 15、移除选择键
selectedKeys.remove();
}
}
// 7、关闭连接
serverSocketChannel.close();
}

public static void main(String[] args) throws IOException {
testServer();
}
}

实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:

(1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。

(2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。

单线程Reactor参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;

Reactor(int port) throws IOException { //Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//attach callback object, Acceptor
sk.attach(new Acceptor());
}

public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}

void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//调用之前注册的callback对象
if (r != null) {
r.run();
}
}

// inner class
class Acceptor implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null)
new Handler(selector, channel);
} catch (IOException ex) { /* ... */ }
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

class Handler implements Runnable {
public static final int INPUT_SIZE = 1024;
public static final int SEND_SIZE = 1024;
final SocketChannel channel;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(INPUT_SIZE);
ByteBuffer output = ByteBuffer.allocate(SEND_SIZE);
static final int READING = 0, SENDING = 1;
int state = READING;

Handler(Selector selector, SocketChannel c) throws IOException {
channel = c;
channel.configureBlocking(false);
// Optionally try first read now
sk = channel.register(selector, 0);
// 将Handler作为callback对象
sk.attach(this);
// 第二步,注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}

boolean inputIsComplete() {
/* ... */
return false;
}

boolean outputIsComplete() {
/* ... */
return false;
}

void process() {
/* ... */
return;
}

public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}

void read() throws IOException {
channel.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
//第三步,接收write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}

void send() throws IOException {
channel.write(output);
//write完就结束了, 关闭select key
if (outputIsComplete()) {
sk.cancel();
}
}
}

1、 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。

2、因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。

多线程Reactor参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import nio.single.Handler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class MthreadReactor implements Runnable {

//subReactors集合, 一个selector代表一个subReactor
Selector[] selectors = new Selector[2];
int next = 0;
final ServerSocketChannel serverSocket;

MthreadReactor(int port) throws IOException { //Reactor初始化
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk = serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
//attach callback object, Acceptor
sk.attach(new Acceptor());
}

public void run() {
try {
while (!Thread.interrupted()) {
for (int i = 0; i < 2; i++) {
selectors[i].select();
Set selected = selectors[i].selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}

}
} catch (IOException ex) { /* ... */ }
}

void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//调用之前注册的callback对象
if (r != null) {
r.run();
}
}

class Acceptor { // ...
public synchronized void run() throws IOException {
SocketChannel connection = serverSocket.accept(); //主selector负责accept
if (connection != null) {
new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection
}
if (++next == selectors.length) next = 0;
}
}
}

https://www.cnblogs.com/crazymakercircle/p/9833847.html

https://www.cnblogs.com/winner-0715/p/8733787.html

https://blog.csdn.net/weixin_37778801/article/details/86699341