Java核心技术读书笔记11-6 Java AIO(NIO 2.0)


1.什么是Java AIO?

在Java SE 7 API中NIO相关库得到了扩充,新的NIO库被称为NIO 2.0,在这一版中Java提供了异步IO的能力,包括异步处理文件以及异步访问Socket数据。

异步的IO操作是IO模型的一种(关于IO模型,可以参考:),它采用的是一种“订阅-通知”模式。即应用程序线程向操作系统注册IO监听,当应用程序线程发出IO请求后便可以不在关心IO处理,内核进行系统调用,当数据准备好时进行接收数据,接收完毕后还会自行将数据写入用户地址空间。写入完毕之后由内核主动通知应用程序。因此,AIO是异步非阻塞的。

对于Java的AIO来说,数据的读写操作直接调用API的read或write方法即可。
对于读操作来说,数据源中有数据可读,内核会将数据读入read方法的缓冲区(用户空间)然后通知应用程序。对于写操作来说,内核会将write方法传递到数据源,传递完毕后通知应用程序。两个方法都是异步的,这样的异步操作不是由发起I/O请求的线程来处理,而是通过线程池分配一个新的线程来进行处理。最后,对于异步操作的结果处理分为两种:返回一个Future对象来跟踪异步任务或注册时使用CompletionHandler在异步I/O完成后执行回调函数。
与第一版NIO的四种通道类似,Java SE 7为Java.nio.channels包新增了四种异步通道:
AsynchronousFileChannel
AsynchronousServerSocketChannel
AsynchronousSocketChannel
AsynchronousDatagramChannel
分别对应文件、TCP请求、TCP连接以及UDP的数据传输。

2.异步执行流程与异步通道组

2.1 异步执行的详细流程

1.首先,Java的异步任务分为两种:进行异步的IO操作与为IO结果执行回调函数。
2.Java是通过异步通道组中的线程池为异步任务分配线程,这个线程组可以在创建四种异步通道时指定,如果不指定则使用系统默认的组。
3.在用户线程执行任务时,会将IO请求提交给操作系统内核,如果存在CompletionHandler则一同注册到内核,然后立即返回继续执行线程的其余代码。
4.内核会使用系统调用完成相应的IO操作,例如读操作为:从数据源(文件、Socket)读数据写入用户线程的缓冲区。
5.IO完毕之后内核指定操作系统合适的Proactor(完成分发器——completion dispatcher)完成分发任务,即执行CompletionHandler指定的回调函数。

若不在创建通道时指定组,则会使用Java虚拟机维护的一个自动构造的系统范围的默认组,默认组有一个关联的线程池,可以根据需要创建新线程。如果没有配置默认组的ThreadFactory,则该组的线程池中的线程为守护线程。这意味着,如果你在主线程中构建了一个使用未配置的默认组的通道,有可能主线程会在异步任务执行完毕前退出,而此时守护线程也将因为无其他线程运行而退出,该任务将立即终止。

2.2 异步通道组

异步通道组关联一个线程池,在创建时指定。随后可以将该组作为参数传入一个异步通道。使用一个组的通道共享该组的线程池。

//        ExecutorService executorService = Executors.newSingleThreadExecutor(); //一个单线程线程池
//        AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
//        AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);//缓存线程池
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());//固定线程池
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); //建立通道并指定通道组

3.异步读写文件

和同步的NIO类似,AIO也需要使用buffer来读取数据。异步读写文件的方式分两种分别是使用Future对象和

3.1 使用Future对象接收异步任务

读数据
首先,需要使用AsynchronousFileChannel类的静态方法open来返回一个AsynchronousFileChannel对象,接下来是将管道连接至一个缓冲区并将数据读到缓冲区内。
与同步读写File的NIO不同,该方法不会返回读入了多少字节,而是会立即返回一个Future对象,该对象即为在多线程编程中遇到过的java.util.concurrent.Future,该类可以处理异步任务,并且可以使用isDone实例方法检查任务是否完毕以及get方法获取结果。下面为示例代码:

        AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("file/Test.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ); //获得一个异步文件管道
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        Future future = channel.read(buffer, 0); //读入数据到缓冲区,第二个参数为下标,指示文件的开始读取位置
        while(!future.isDone()); //如果异步任务未完成则进行阻塞
        buffer.flip(); 
        FirstNIO.printBufByte(buffer, "AIOTestOne"); //一个自定义的打印缓冲区结果方法

读数据
写操作类似,写这个过程也是一个任务,同样返回Future对象并根据这个对象判断任务执行的状态。

        buffer.put("abcdef".getBytes());
        buffer.flip();
        Future future = channel.write(buffer, channel.size()); //从缓冲区写出数据到文件,第二个参数为下标,指示文件的开始写出位置
        while (!future.isDone()) ; //如果异步任务未完成则进行阻塞
        System.out.println("写入完成");
3.2 使用CompletionHandler

读数据
第二种方式是使用CompletionHandler,需要构建一个CompletionHandler的实现类对象,然后将该对象作为参数传入read方法。CompletionHandler接口的两个抽象方法分别是completed与failed,这两个方法承担了回调方法的作用,在read完毕后如果成功会调用completed,失败则会调用failed方法。因此可以在实现接口时重写的completed方法内部编写接收数据的处理逻辑。
使用CompletionHandler后,本线程发起一个I/O请求,调用read方法将这个请求提交给内核,内核处理完毕之后由分发器使用JVM分配的一个新的线程处理回调函数。

        AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("file/Test.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ); //获得一个异步文件管道
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        /**
         * 第一个参数buffer会接收数据,第二个参数指定文件起始操作位置,第三个参数attachment也可以接收数据并可传递给completed方法
         */
        channel.read(buffer, 0, buffer, new CompletionHandler() { 
            @Override
            public void completed(Integer result, ByteBuffer attachment) { //第一个参数为读取字节数,第二个参数为attachment接收数据
                System.out.println(Thread.currentThread().getName()); //一个新线程
                System.out.println("读入了" + result + "个字节");
                attachment.flip();
                FirstNIO.printBufByte(attachment, "任务自动处理");
                attachment.clear();
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.out.println("读入数据失败");
            }
        });
        Thread.sleep(10); //防止本线程结束时IO线程还未创建而无法接受数据。

写数据
写出数据类似,可以在completed方法中查看写出的字节数与写出数据。

AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("file/Test.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ); //获得一个异步文件管道
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("\nnewStr".getBytes());
        buffer.flip();
        /**
         * 第一个参数buffer会接收数据,第二个参数指定文件起始操作位置,第三个参数attachment也可以查看写出数据并可传递给completed方法
         * 下述IO操作都由一个新线程执行
         */
        channel.write(buffer, channel.size(), buffer, new CompletionHandler() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) { //第一个参数为写出字节数,第二个参数为attachment写出数据
                System.out.println(Thread.currentThread().getName()); //一个新线程
                System.out.println("写出了" + result + "个字节"); //写出字节数量
                attachment.flip(); //可以查看写出的字节
                FirstNIO.printBufByte(attachment, "任务自动处理");
                attachment.clear();
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.out.println("写出数据失败");
            }
        });
        Thread.sleep(100); //防止本线程结束时IO线程还未创建而无法写出数据。

4.异步建立TCP连接与读写Socket数据

异步建立TCP连接与读写Socket数据与使用多路复用NIO的步骤非常相似,使用的分别是AsynchronousServerSocketChannel与AsynchronousSocketChannel,前者监听端口并通过accept方法获取TCP连接,并返回连接所用Socket。后者的一端连接有TCP所用的Socket用以传输数据。

4.1 使用Future对象接收异步任务

使用Future对象接收异步任务,对于异步Channel的建立连接、读、写都会返回Future对象,如果需要确保I/O确实完成了,需要该对象的isDone方法返回true。下面为对应的客户端与服务器端的代码。

//客户端
public class AIOClient {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        InetSocketAddress address = new InetSocketAddress("localhost", 8189);
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        Future connectFuture = channel.connect(address);
        while(!connectFuture.isDone()); //等待直到建立连接
        Future writeFuture;
        Future readFuture;
        if(connectFuture.get() == null){
            System.out.println("已成功连接服务器");
            readFuture = channel.read(buffer);
            while (!readFuture.isDone());
            while (true){
                buffer.flip();
                FirstNIO.printBufByte(buffer, "服务器端");
                Scanner scanner = new Scanner(System.in);
                String message = "";
                message = scanner.nextLine();
                buffer.put(message.getBytes());
                buffer.flip();
                writeFuture = channel.write(buffer);
                while(!writeFuture.isDone());
                buffer.clear();
                readFuture = channel.read(buffer);
                while (!readFuture.isDone());
            }
        }else{
            System.out.println("无法建立连接");
        }
    }
}

//服务器端
public class AIOServer {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(8189);
        server.bind(address);
        System.out.println("服务器启动成功,正在等待连接...");
        while (true){
            Future future = server.accept();
            while(!future.isDone());
            System.out.println("建立新连接成功");
            Future writeFuture;
            Future readFuture;
            AsynchronousSocketChannel channel = future.get();
            buffer.put("Hello, here is server, welcome!".getBytes());
            buffer.flip();
            writeFuture = channel.write(buffer);
            while(!writeFuture.isDone());
            buffer.clear();
            readFuture = channel.read(buffer);
            while (!readFuture.isDone());
            while (true){
                buffer.flip();
                FirstNIO.printBufByte(buffer, "客户端");
                buffer.put("message received".getBytes());
                buffer.flip();
                writeFuture = channel.write(buffer);
                while(!writeFuture.isDone());
                buffer.clear();
                readFuture = channel.read(buffer);
                while (!readFuture.isDone());
            }
        }
    }
}
4.2 使用CompletionHandler

与读写文件类似,异步的ServerSocketChannel与SocketChannel的连接、读、写方法都有指定CompletionHandler对象的版本,在操作成功时会使用completed方法处理,否则会交给failed方法处理。
需要注意的是,与异步FileChannel不同,网络异步操作的连接、读写的操作可能是不断的,所以为了处理不断的操作,应该在completed与failed方法内再调用一次相关的方法以便继续处理下次操作。
下面是将服务器端的连接操作改造为CompletionHandler版本的示例代码,注意server.accept(null, this);

public class AIOServer2 {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {

        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(8193);
        server.bind(address);
        System.out.println("服务器启动成功,正在等待连接...");
        server.accept(null, new CompletionHandler() {
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                server.accept(null, this); //处理接续进入的连接
                System.out.println("建立新连接成功");
                Future writeFuture;
                Future readFuture;
                AsynchronousSocketChannel channel = result;
                buffer.put("Hello, here is server, welcome!".getBytes());
                buffer.flip();
                writeFuture = channel.write(buffer);
                while (!writeFuture.isDone()) ;
                buffer.clear();
                readFuture = channel.read(buffer);
                while (!readFuture.isDone()) ;
                while (true) {
                    buffer.flip();
                    FirstNIO.printBufByte(buffer, "客户端");
                    buffer.put("message received".getBytes());
                    buffer.flip();
                    writeFuture = channel.write(buffer);
                    while (!writeFuture.isDone()) ;
                    buffer.clear();
                    readFuture = channel.read(buffer);
                    while (!readFuture.isDone()) ;
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                server.accept(null, this); //处理接续进入的连接
                System.out.println("建立连接失败");
                System.out.println(exc.getMessage());
            }
        });
        while (true){
            Thread.sleep(5000);
        }
    }
}

参考:

17. Java NIO AsynchronousFileChannel异步文件通道
Proactor模式详解