I/O的四种模型
同步阻塞(Synchronous blocking I/O)
同步非阻塞(Synchronous non-blocking I/0)
异步阻塞(Asynchronous blocking I/0)
异步非阻塞(Asynchronous non-blocking I/0)
I/O多路复用 I/O多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。
一般情况下,I/O 复用机制需要事件分发器。 事件分发器的作用,将那些读写事件源分发给各读写事件的处理者。
涉及到事件分发器的两种模式称为:Reactor和Proactor。 Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。本文主要介绍的就是 Reactor模式相关的知识。
Reactor模式 Reactor模式也叫反应器模式
IO的发展历史 单线程阻塞 1 2 3 4 while (true ){ socket = accept(); handle(socket) }
这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。
多线程阻塞 之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;class BasicModel implements Runnable { public void run () { try { ServerSocket ss = new ServerSocket(SystemConfig.SOCKET_SERVER_PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); } catch (IOException ex) { } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run () { try { byte [] input = new byte [SystemConfig.INPUT_SIZE]; socket.getInputStream().read(input); byte [] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { } } private byte [] process(byte [] input) { byte [] output=null ; return output; } } }
对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。
tomcat服务器的早期版本确实是这样实现的。
优点 定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。
缺点 缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。
改进 采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。
单线程NIO模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;class Server { public static final int SOCKET_SERVER_PORT = 8088 ; public static void testServer () throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); serverSocketChannel.bind(new InetSocketAddress(SOCKET_SERVER_PORT)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0 ) { Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey selectedKey = selectedKeys.next(); if (selectedKey.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false ); socketChannel.register(selector, SelectionKey.OP_READ); } else if (selectedKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectedKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024 ); int length = 0 ; while ((length = socketChannel.read(byteBuffer)) != -1 ) { byteBuffer.flip(); System.out.println(new String(byteBuffer.array(), 0 , length)); byteBuffer.clear(); } socketChannel.close(); } selectedKeys.remove(); } } serverSocketChannel.close(); } public static void main (String[] args) throws IOException { testServer(); } }
实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:
(1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。
(2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。
单线程Reactor参考 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false ); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } public void run () { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException ex) { } } void dispatch (SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null ) { r.run(); } } class Acceptor implements Runnable { public void run () { try { SocketChannel channel = serverSocket.accept(); if (channel != null ) new Handler(selector, channel); } catch (IOException ex) { } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;class Handler implements Runnable { public static final int INPUT_SIZE = 1024 ; public static final int SEND_SIZE = 1024 ; final SocketChannel channel; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SEND_SIZE); static final int READING = 0 , SENDING = 1 ; int state = READING; Handler(Selector selector, SocketChannel c) throws IOException { channel = c; channel.configureBlocking(false ); sk = channel.register(selector, 0 ); sk.attach(this ); sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete () { return false ; } boolean outputIsComplete () { return false ; } void process () { return ; } public void run () { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { } } void read () throws IOException { channel.read(input); if (inputIsComplete()) { process(); state = SENDING; sk.interestOps(SelectionKey.OP_WRITE); } } void send () throws IOException { channel.write(output); if (outputIsComplete()) { sk.cancel(); } } }
1、 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。
2、因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。
多线程Reactor参考 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import nio.single.Handler;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;class MthreadReactor implements Runnable { Selector[] selectors = new Selector[2 ]; int next = 0 ; final ServerSocketChannel serverSocket; MthreadReactor(int port) throws IOException { selectors[0 ] = Selector.open(); selectors[1 ] = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false ); SelectionKey sk = serverSocket.register(selectors[0 ], SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } public void run () { try { while (!Thread.interrupted()) { for (int i = 0 ; i < 2 ; i++) { selectors[i].select(); Set selected = selectors[i].selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { dispatch((SelectionKey) (it.next())); } selected.clear(); } } } catch (IOException ex) { } } void dispatch (SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null ) { r.run(); } } class Acceptor { public synchronized void run () throws IOException { SocketChannel connection = serverSocket.accept(); if (connection != null ) { new Handler(selectors[next], connection); } if (++next == selectors.length) next = 0 ; } } }
https://www.cnblogs.com/crazymakercircle/p/9833847.html
https://www.cnblogs.com/winner-0715/p/8733787.html
https://blog.csdn.net/weixin_37778801/article/details/86699341