IO多路复用通过NIO的Selector相关API来实现, 先看一下基础的操作, 然后编写两个服务器来进行对比. 这里还涉及到Java的网络编程, 在后边也要专门看一下了.
明天就要发布高考分数了, 这里还是大声疾呼一下, 不管多少分, 能报名计算机专业的报名计算机, 报不上和考文科的, 暑假里就报班学Java, 别等了, 真的.
IO多路复用的基础结构
IO多路复用的基础结构, 就是之前提到过的NIO中的Selector相关的API. 包括:
- 支持Selector特性的Channel:
SelectableChannel
Seletor
, 用于监听多路IO的选择器, 其名字来源于操作系统的系统调用select()函数SelectionKey
, 用于让监听多路IO的选择器知道监听哪一路Channel(文件描述符)的对象, 代表被选择器监听的某一个Channel.
在之前看过操作系统, Java里的操作逻辑实际上和C语言使用系统调用是一样的:
- 创建Selector对象和需要被其监听的一系列Channel对象
- 将Channel对象注册到Selector对象中, 每次注册都可以获取一个独立的SelectionKey, 用于标识某个Channel
- 调用select()方法, 这个方法会阻塞, 直到被监听的Channel中有至少一个可用, 然后返回所有可用的Channel
- 遍历Channel, 对每个Channel进行处理
所以这个逻辑和系统调用select()返回可用的文件描述符的集合如出一辙, 只不过因为是面向对象包了很多东西, 所以在使用的时候还有一些小细节需要注意. 来看看具体的代码吧.
首先是需要知道哪些Channel支持Selector特性, 通过查看文档可以知道, java.nio.channels中有一个类SelectableChannel, 是支持Selector(can be multiplexed via a Selector)的.
然后这个类有一个子类也是一个抽象类AbstractSelectableChannel, 这个抽象类有五个实现类: DatagramChannel, Pipe.SinkChannel, Pipe.SourceChannel, ServerSocketChannel, SocketChannel.
对于网络IO来说, 我们关心其中的DatagramChannel, ServerSocketChannel, SocketChannel, 也就是UDP端, TCP服务端和TCP客户端.
然后是Selector类, java.nio.Channels中的Selector类, 有一个静态方法.open(), 用来创建一个Selector. 就使用这个方法来创建Selector.
有了Channel和Selector之后, 就是将Channel注册到Selector上, 也就是调用channel.ssc.register(selector, SelectionKey.OP_ACCEPT);
.
这里的第二个参数指的是监听什么事件, 对于Socket通信来说唯一可以监听的就是ACCEPT, 也就是连接有数据到达的事件. 这个注册会返回一个SelectionKey对象, 通过这个对象就可以知道是哪个Channel出现事件.
这样三大组件即Channel, Selector, SelectorKey都知道如何创建和使用了, 下边就来看一下细节:
第一步, 准备若干个Channel
//新创建一个ServerSocketChannel, 也就是TCP服务端 ServerSocketChannel serverSocketChannelAt8000 = ServerSocketChannel.open(); //很重要, 将其设置为异步模式, 否则还是同步模式 serverSocketChannelAt8000.configureBlocking(false); //只有了TCP服务端Channel, 还需要从中获取其内部包装的ServerSocket对象用来绑定端口 ServerSocket socket = serverSocketChannelAt8000.socket(); InetSocketAddress address = new InetSocketAddress(8000); socket.bind(address); //继续创建一个绑定7000, 8888端口的channel ServerSocketChannel serverSocketChannelAt7000 = ServerSocketChannel.open(); serverSocketChannelAt7000.socket().bind(new InetSocketAddress(7000)); serverSocketChannelAt7000.configureBlocking(false); ServerSocketChannel serverSocketChannelAt8888 = ServerSocketChannel.open(); serverSocketChannelAt8888.socket().bind(new InetSocketAddress(8888)); serverSocketChannelAt8888.configureBlocking(false);
到这里其实可以在后边加一句serverSocketChannel.accept();
, 运行会发现程序直接结束, 这是因为设置了异步, 并没有等待TCP连接进入.
如果改成同步, 则serverSocketChannel.accept();
这句就会一直阻塞到有访问进来.
第二步, 配置Selector和进行注册绑定.
//创建一个selector用于监听 Selector selector = Selector.open(); //channel调用自己的register方法向selector中注册, 并得到一个SelectionKey对象, 当然此时这个Key没有什么用, 实际用的是每个连接进来的描述符对应的Key //将三个Channel都注册到selector中 SelectionKey key8000 = serverSocketChannelAt8000.register(selector, SelectionKey.OP_ACCEPT); SelectionKey key7000 = serverSocketChannelAt7000.register(selector, SelectionKey.OP_ACCEPT); SelectionKey key8888 = serverSocketChannelAt8888.register(selector, SelectionKey.OP_ACCEPT);
第三步, 启动服务器的主循环, 监听所有Channel, 然后每次对就绪的Channel进行操作:
while (true) { //Selector进行监听, 返回一个int, 这个指有几个Channel出现了事件, 即IO可用 //注意这个方法是阻塞的, 也就是如果程序执行到了此行之后的语句, 一定会有可用的IO出现 int number = selector.select(); //获取所有可用的keys, 类似于获取所有就绪的Channel Set<SelectionKey> availableKeys = selector.selectedKeys(); //获取迭代器, 用于所有就绪的Channel Iterator<SelectionKey> iterable = availableKeys.iterator(); while (iterable.hasNext()) { SelectionKey selectionKey = iterable.next(); //这里要注意, 处理完一个之后, 需要立刻将其从迭代中去除, 否则下一次还会继续监听到这个端口 iterable.remove(); if (selectionKey.isAcceptable()) { //从SelectionKey中获取channel对象, 因为知道类型, 所以强制转换, 然后可以从其中获取连接的信息 ServerSocketChannel newChannel = (ServerSocketChannel) selectionKey.channel(); System.out.println("接受连接来自: "+ newChannel.socket().getLocalPort()); //从ServerSocketChannel中获取SocketChannel. 也就是TCP连接 SocketChannel socketChannel = newChannel.accept(); //将内容一次性读入到2048长度的字节中 ByteBuffer byteBufferIn = ByteBuffer.allocate(2048); System.out.println(socketChannel.read(byteBufferIn)); byteBufferIn.flip(); //这一块也是NIO的用法, 和Charset相关 //将其按照UTF-8进行解码然后放到CharBuffer中, 打印出来 Charset utf8 = Charset.forName("UTF-8"); CharsetDecoder decoder = utf8.newDecoder(); CharBuffer charBufferin = decoder.decode(byteBufferIn); System.out.println(charBufferin.array()); //直接关闭连接 socketChannel.close(); } else if (selectionKey.isValid() && selectionKey.isReadable()) { System.out.println("可以读"); } else if (selectionKey.isValid() && selectionKey.isWritable()) { System.out.println("可以写"); } } }
然后就可以启动程序了, 启动之后, 这个selector会同时监听7000,8000,8888端口的三个Channel, 哪个有连接进来, 就会创建连接然后打印出传来的内容.
用浏览器访问http://localhost:8888/
, http://localhost:8000/
, http://localhost:7000/
. 不管哪一个访问进来, selector都可以创建连接.
这样一个线程就可以监听很多IO通道, 而不用每来一个IO都用一个线程去响应. 这里还是使用了单线程进行处理, 实际上, 还可以在每一个SocketServerChannel出现ON_ACCEPT事件的时候, 将每一个Socket连接交给一个线程去处理, 这样就是一个线程监听多个Channel, 然后进行分发, 效率更高. 而且不会等待IO.
不使用NIO的多线程Echo服务器
这一节跟着书上写一个多线程不使用NIO的服务器看看, 看来Java 网络编程得马上补上了. 计算机网络这本书还没看, 好在马上就要把并发干完了.
先是服务器, 这是不使用NIO的传统多线程服务器, 每进来一个新连接, 就将这个连接交给一个新线程去处理, 这也是学C语言的时候写过的老套路了, 直接上代码:
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MuiltThreadEchoServer { //服务器端口与构造器 int port; public MuiltThreadEchoServer(int port) { this.port = port; } //服务器的线程池 private final ExecutorService pool = Executors.newCachedThreadPool(); //处理每个客户端连接的线程类 static class HandleMsg implements Runnable { //私有变量与构造器, 保存当前Socket连接 Socket clientSocket; public HandleMsg(Socket clientSocket) { this.clientSocket = clientSocket; } //核心的run()方法 @Override public void run() { BufferedReader reader = null; PrintWriter writer = null; //设置好reader和writer try { reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); writer = new PrintWriter(clientSocket.getOutputStream(), true); //尝试读出客户端发来的内容, 然后原样写入 String inputLine = null; long b = System.currentTimeMillis(); while ((inputLine = reader.readLine()) != null) { writer.println(inputLine); } long e = System.currentTimeMillis(); System.out.println("spend: " + (e - b) + " ms"); } catch (IOException e) { e.printStackTrace(); }finally { if (reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } if (writer != null) { writer.close(); } try { clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } //启动服务器 public void start() { ServerSocket echoServer = null; Socket clientSocket = null; //创建服务端Socket, 绑定8000端口, 启动失败直接退出 try{ echoServer = new ServerSocket(port); } catch (IOException e) { System.out.println("服务器启动失败"); System.exit(1); } while (true) { try { //新进来连接就创建一个新线程交给线程池 clientSocket = echoServer.accept(); System.out.println(clientSocket.getRemoteSocketAddress() + " connected."); pool.execute(new HandleMsg(clientSocket)); } catch (IOException e) { e.printStackTrace(); } } } }
然后是客户端, 这里的客户端就比较搞, 连接成功之后不会释放连接, 而是故意一个一个字的输入, 这样每一个新创建的线程就会卡在等待数据上很久.
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; public class Client implements Runnable { @Override public void run() { Socket client = null; PrintWriter writer = null; BufferedReader reader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream()); //这里把要写入的信息分6次发送, 发送间隔一秒, 一次发送一个字符 String msg = "Hello!"; for (int i = 0; i < msg.length(); i++) { Thread.sleep(1000); writer.print(msg.charAt(i)); } //要写一个println(), 相当于对面收到EOF, 才能完成一次发送 writer.println(); writer.flush(); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); System.out.println("from server: " + reader.readLine()); } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } if (writer != null) { writer.close(); } if (client != null) { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } } } //这里启动10个线程去连接, 可以发现都连接成功, 但是都耗时6秒钟. 有没有办法让线程不等待呢. public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(new Client()).start(); } } }
将多线程Echo服务器使用NIO改造
上边的那个例子的核心在于, 如果有大量线程连接之后不做什么事情, 那么服务器没有从客户端连接中获取数据, 无法做使用, 同时又不能杀死连接线程, 就会导致同一时刻有大量的线程被创建但是在等待IO, 会拖慢系统.
现在就用全新的NIO来改造线程服务器, 不发给我数据, 我就不等待, 来了一个数据, 响应一次, 不来, 就当做没事一样.
根据上边的NIO知识, 监听所有可用的I/O对象, 也就是客户端连接, 只需要使用一个线程就行了, 然后每有客户端连接进来数据, 就交给一个线程去处理就行. 如果没有新进来的数据, 那么也没有线程被创建出来进行工作.
import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NIOEchoServer { private Selector selector; private final ExecutorService pool = Executors.newCachedThreadPool(); private final Map<Socket, Long> time = new HashMap<>(); int port; public NIOEchoServer(int port) { this.port = port; } public void startServer() throws IOException { //创建Selector Selector selector = Selector.open(); //创建TCP服务器Channel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置为非阻塞 serverSocketChannel.configureBlocking(false); //绑定到端口 serverSocketChannel.socket().bind(new InetSocketAddress("localhost", port)); //这个套路都知道了, 注册给selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //服务器主循环 while (true) { //阻塞在此, 监听channel selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); long e = 0; //这里其实就一个Channel, 只要有连接或者发送信息, 就是出现响应 while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); //这个判断是有新连接进来 if (key.isAcceptable()) { doAccept(key); } //可读, 相当于客户端发送信息过来, 记录时间后执行读取动作 else if (key.isReadable() && key.isValid()) { //向记录时间的Map中写入时间 if (!time.containsKey(((SocketChannel) key.channel()).socket())) { time.put(((SocketChannel) key.channel()).socket(), System.currentTimeMillis()); } doRead(key); //可写, 相当于客户端发完了信息, 在等待响应 } else if (key.isWritable() && key.isValid()) { doWrite(key); e = (System.currentTimeMillis()); //从记录时间的map中取出对应的时间, 然后比较一下耗时 long b = time.remove(((SocketChannel) key.channel()).socket()); System.out.println("耗时: " + (e - b) + " ms"); } } } } private void doWrite(SelectionKey key) { } private void doRead(SelectionKey key) { } private void doAccept(SelectionKey key) { } }
这个逻辑基本上和非NIO的一样, NIO的selector就监听了一个端口的SocketServerChannel, 具体还是要放到方法里去执行从SocketServerChannel获取SocketChannel的方法.
下边来实现一下三个方法, 首先是doAccept():
private void doAccept(SelectionKey key) throws IOException { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel clientSocket = serverSocketChannel.accept(); //凡是支持selectableChannel的, 都要设置成非阻塞 clientSocket.configureBlocking(false); //这里很关键, 把获取的具体ClientChannel, 也注册到同一个selector上, 注册事件为可读, 此时已经连接完毕, 下一个事件就是可读了. //这样同一个selector, 就可以既监听TCP进来连接, 也可以监听已经创建的连接了, 前边的startServer()中的方法的三种分支就有用武之地了. SelectionKey clientKey = clientSocket.register(selector, SelectionKey.OP_READ); //由于客户端不是一次性发完全部数据, 而是慢慢发送, 因此给key attach一个对象, 就像是MVC里的model一样, 以后再使用这个key, 都可以共享这个数据对象 ClientData data = new ClientData(); clientKey.attach(data); //doAccept()的作用就是获取TCP连接, 将连接加入到Selector的监听中, 然后附加上一个数据对象用于和客户端的通信, 之后就结束了. }
然后是doRead()方法, 这个是指一个SocketChannel可读之后, 会被Selector监听到, 然后在主循环中交给doRead()方法来使用:
private void doRead(SelectionKey key) throws IOException { //注意, 进到这里的SelectionKey不是ServerSocketChannel的Key, 而是SocketChannel的Key, 这是因为能够监听到READ事件的是在Accept()方法里注册SocketChannel的Key SocketChannel clientSocket = (SocketChannel) key.channel(); //创建8K大小的缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(8192); int len; //尝试读取 try { len = clientSocket.read(byteBuffer); //如果读取结果是-1, 说明读取完毕, 直接结束 if (len < 0) { disconnect(key); return; } } catch (IOException e) { //出错也关闭channel System.out.println("Failed to read from client."); e.printStackTrace(); disconnect(key); } byteBuffer.flip(); //这里使用了一个处理消息的类, 交给线程池进行处理 pool.execute(new HandleMsg(key, byteBuffer)); }
来马上看一下数据对象ClientData和对应的HandleMsg类:
import java.nio.ByteBuffer; import java.util.LinkedList; public class ClientData { private LinkedList<ByteBuffer> outQueue; public ClientData() { outQueue = new LinkedList<>(); } public LinkedList<ByteBuffer> getOutQueue() { return outQueue; } public void enqueue(ByteBuffer buffer) { outQueue.addFirst(buffer); } }
private class HandleMsg implements Runnable { ByteBuffer byteBuffer; SelectionKey key; public HandleMsg(SelectionKey key, ByteBuffer byteBuffer) { this.key = key; this.byteBuffer = byteBuffer; } @Override public void run() { ClientData data = (ClientData) key.attachment(); data.enqueue(byteBuffer); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); selector.wakeup(); } }
数据对象内部是一个队列, 保存每次从客户端读到的8K长度的ByteBuffer, HandleMsg则是获取数据对象然后把读到的消息放进去. 因为读过了消息, 所以将那个key的感兴趣事件从只是READ变成READ+WRITE.
然后强制去唤醒selector, 如果selector阻塞, 这个时候会重新再开始一轮监听, 此时就可以监听key新增的WRITE事件了.
最后是doWrite()函数, 与doRead()一样, 进来的key是SocketChannel的key, 而不是ServerSocketChannel的key:
private void doWrite(SelectionKey key) throws IOException {
//注意, 进到这里的SelectionKey不是ServerSocketChannel的Key, 而是SocketChannel的Key, 这是因为能够监听到READ事件的是在Accept()方法里注册SocketChannel的Key
SocketChannel clientSocket = (SocketChannel) key.channel();
ClientData data = (ClientData) key.attachment();
LinkedList<ByteBuffer> buffers = data.getOutQueue();
ByteBuffer lastBuffer = buffers.getLast();
try {
int len = clientSocket.write(lastBuffer);
if (len == -1) {
disconnect(key);
return;
}
//完整的写了一个Buffer, 就移除
if (lastBuffer.remaining() == 0) {
buffers.removeLast();
}
} catch (IOException e) {
System.out.println("向客户端写入失败.");
e.printStackTrace();
disconnect(key);
}
//很重要, 如果全部写完, 就要取消OP_WRITE事件监听
if (buffers.size() == 0) {
key.interestOps(SelectionKey.OP_READ);
}
}
write函数的逻辑比较简单, 每次写一个ByteBuffer, 特别要注意的就是每次写完之后, 一定要取消对WRITE事件的监听.
然后是辅助的disconnect()方法, 关闭那个Key对应的Channel, 这样就再也不会出来事件被监听到了.
private void disconnect(SelectionKey key) throws IOException { SelectableChannel channel = key.channel(); channel.close(); }
对于最后的disconnect函数我还有一点暂时没搞明白, 就是如何从selector中去掉要监听的Key, 这样才可以算完成的取消连接.
写完了整个服务器, 发现确实响应很快, 只会在有IO的时候进行操作, 因此每个连接的实际服务时间也只有15ms左右. 与原来的6000ms有天壤之别. 即使客户端非常慢, 服务器也不会花费资源在等待IO上, 这就是NIO的改进.
总结
这个NIO服务器, 一个Selector先监听服务端Socket, 然后出现连接的时候, 将连接也注册同一个Selector内进行监听.
每次监听的时候, 由于服务端Socket和Socket连接的特性不同, 所以可以分为可连接, 可读和可写状态. 可连接就将新连接加入监听. 可读就读一次然后通过attach来在key中共享数据. 可写就进行写入.
其中读通过多线程进行,每次读入到一个ByteBuffer中, 加入到共享的数据对象中. 每次只要有数据传送, 就会读一次, 然后就结束, 不会一直卡在连接上.
写每次写一个ByteBuffer, 但是由于可写的时候不阻塞, 因此会反复写完之前已经收到的数据.
这样服务器对于IO的实际处理和等待时间就大幅下降了. 这个服务器给我的启示还是不少的, 其中用一个Selector监听所有Channel确实是个好办法, 也体验到了多态, 无论SocketServerChannel还是SocketChannel, 都是SelectableChannel, 所以可以被Selector监听.