Java的并发部分快要看完了, 看完NIO之后就剩下Java 8 函数式编程中的并发内容了. 当然, Java 9中的java.util.concurrent还加了一个反应式编程的Flow. 加上Spring的反应式编程还一直没有仔细的看过, 还是得有机会看看.

在上一个NIO服务器中, 已经实现了不会阻塞等待TCP连接传数据, 不过在doRead()和doWrite()中, 实际的I/O操作还是阻塞的, 比如返回一个文件给客户端, 那么在没有完成文件的读操作之前, 线程依然会阻塞在等待文件I/O操作处.

NIO的最后一部分API, 也就是Channel类中Asynchronous开头的类, 就是为了彻底实现异步IO而准备的. 这些API不仅在Key(文件描述符)没有准备好的时候不阻塞, 就连正常的IO操作, 也不阻塞, 而是等待操作系统完成IO的时候注册一个回调函数来执行. 这样就实现了彻底的异步.

  1. AIO服务器
  2. 启动AIO服务器
  3. AIO客户端

AIO服务器

这四个异步API再回看一下, 第一个是表示一组AIO, 第二个是异步文件, 第三个和第四个就是TCP的AIO API.

  1. AsynchronousChannelGroup
  2. AsynchronousFileChannel
  3. AsynchronousServerSocketChannel
  4. AsynchronousSocketChannel

创建AIO服务器的代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AIOServer {

    private final int port;
    private AsynchronousServerSocketChannel serverSocketChannel;

    public AIOServer(int port) {
        this.port = port;
        try {
            //启动服务器和绑定端口
            this.serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));
        } catch (IOException e) {
            System.out.println("服务器启动失败");
            System.exit(1);
        }
    }

    public void start() {
        System.out.println("服务器启动在: " + port + "端口");
        //非常关键的一步, 注册一个CompletionHandler, 这个是java.nio.channels中的一个接口, 专门用于AIO的回调
        //第一个参数是一个attachment, 这个attachment会在后边的两个方法内作为第二个参数被使用
        serverSocketChannel.accept(42, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            //这个是必须实现的方法之一, 用于成功的时候如何做
            final ByteBuffer byteBuffer = ByteBuffer.allocate(2048);

            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {

                //由于纯异步, 需要搭配Future对象, 先定义一个要返回给客户端的结果, 然后放在Future对象中

                Future<Integer> writeResult = null;
                byteBuffer.clear();

                //重要, AsynchronousSocketChannel的read方法返回一个Future<Integer>对象, 表示后来读到了几个字节, 因为这个read方法不阻塞.
                //非Asynchronous的Channel比如上一次的SocketChannel的read方法返回一个int, 这是因为read是阻塞的.
                writeResult =  result.read(byteBuffer);

                //睡眠1秒来模拟做其他事情
                try {
                    System.out.println("服务端开始做1秒钟的其他事情...");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //只要不调用Future对象的.get()方法, 就不会等待Future对象完成, 所以到这里都不会阻塞
                try {
                    System.out.println("实际读取到了: "+ writeResult.get(100, TimeUnit.SECONDS) + "个字节.");
                    System.out.println("读取到的是: " + Arrays.toString(byteBuffer.array()));
                    byteBuffer.flip();

                    //这个write()方法也一样返回一个Future对象, 然后立刻返回
                    writeResult = result.write(byteBuffer);

                } catch (InterruptedException | TimeoutException | ExecutionException e) {
                    System.out.println("Future结果读取错误.");
                } finally {
                    serverSocketChannel.accept(null, this);
                    //等待写完
                    try {
                        //确认写完, 然后关闭当前连接的AsyncSocketChannel
                        writeResult.get();
                        result.close();

                    } catch (InterruptedException | ExecutionException | IOException e) {
                        e.printStackTrace();
                    }
                }

            }

            //这个是另外一个必须实现的方法, 用于失败的时候如何做
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("连接失败. 异常是: " + exc);
            }
        });

    }
}

这个服务器唯一的阻塞点, 就是连接创建成功之后, 对读取的Future对象调用.get()方法, 以及在写入的时候, 保证写完, 调用.get()方法.

其他的所有方法, 完全都不阻塞, 确实给力.

启动AIO服务器

启动AIO服务器就不像原来的服务器中有select()阻塞, 启动了就可以, AIO服务器在没有连接进来的时候, 所有方法都是不阻塞的, 相当于用一个主线程启动了在另外一个线程运行的AIO服务器. 如果没有连接进来, 主线程运行结束之后, 整个程序也结束了, 因为不阻塞, 全部运行到底了. 又没有while(true)循环, 所以主线程可以做自己的事情, 然后睡眠等待.

public static void main(String[] args) throws IOException, InterruptedException {

    //纯异步, 直接结束
    new AIOServer(8000).start();

    //这里让主线程等待, 后台服务器已经在另外一个线程中开启. 由于所有方法都不阻塞, 主线程退出的时候服务器也关闭了.
    while (true) {
        Thread.sleep(1000);
    }
}

AIO客户端

看了AIO服务端, 再来看看客户端的编写. 客户端的编写与服务端非常类似, 只不过是直接使用AsyncSocketChannel来进行连接, 也一样需要注册回调对象.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

public class AIOClient {

    private final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

    public AIOClient() throws IOException {
    }


    public void connect(int port) {
        //关键的connect函数, 第二个参数是attachment, 最后是一个回调对象
        client.connect(new InetSocketAddress("localhost", port), null, new CompletionHandler<Void, Object>() {

            //连接成功后执行这个方法
            @Override
            public void completed(Void result, Object attachment) {
                System.out.println("连接成功");
                //内部对于写入, 有一个参数的返回Future对象的方法, 也有三参数就像这里的, 继续回调的方法, 因为连接成功后需要发送数据, 所以继续回调
                client.write(ByteBuffer.wrap("Hello!".getBytes(StandardCharsets.UTF_8)), null, new CompletionHandler<Integer, Object>() {

                    //到这里是写入成功, 所以继续回调, 要进行读取
                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("写入成功");
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        byteBuffer.clear();
                        client.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
                            //运行到这里说明读取成功
                            @Override
                            public void completed(Integer result, Object attachment) {
                                System.out.println("读取成功");
                                byteBuffer.flip();
                                System.out.println("from server: " + Arrays.toString(byteBuffer.array()));
                                //读取成功后关闭客户端连接
                                try {
                                    client.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }

                            }

                            //读取失败
                            @Override
                            public void failed(Throwable exc, Object attachment) {
                                System.out.println("从服务器读取失败");

                            }
                        });
                    }

                    //这里是写入失败
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("向服务器写入失败");
                    }
                });

            }

            //这里是连接失败
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("连接失败");
            }
        });
    }

}

由于是纯异步, 所以这里其实是一个回调地狱. Java写着写着写出JavaScript的感觉来了.AsynchronousSocketChannel的connect, read和write方法, 全部都可以继续注册回调对象, 表明成功之后的下一步动作.

因此这个客户端的逻辑是: 先进行连接, 连接成功后有一个回调去尝试发送数据, 在发送数据的write()中再注册回调, 如果发送成功, 就尝试读取. 在读取函数中再注册回调, 如果读取成功就打印出来.

虽然是异步, 但是整个过程中, 主线程都不阻塞, 可以做其他事情. 就像这样调用:

public static void main(String[] args) throws IOException, InterruptedException {
    AIOClient client = new AIOClient();
    client.connect(8000);
    while (true) {
        System.out.println("做其他事情");
        Thread.sleep(1000);
    }
}

这种纯粹的不阻塞的IO方式看完了, 其实感觉主要还是依赖Future对象, 用来实现异步的调用, 而Future其实本质就是一个线程池加上Callable对象的封装. 与底层操作系统提供的异步IO也有着直接关系.

接下来就是函数式编程并发的部分了.