Java核心技术读书笔记11-4 Java NIO Buffer、Channel、Selector与通信服务器、客户端简单实现


4.四种Channel实现类

4.1 FileChannel
FileChannel是一端连接硬盘文件的Channel,想要获得这个FileChannel必须通过InputStream、OutputStream或RandomAccessFile。

需要注意的是由不同的流获取的FileChannel的方向性会有所区别。使用InputStream与模式为"r"的RandomAccessFile获得的FileChannel只能从文件写出数据,使用OutputStream获得的FileChannel只能向文件写入数据。只有使用"rw"、"rwd"与"rws"模式的RandomAccessFile获得的FileChannel具有双向的功能。

position、size与truncate
FileChannel除了前文演示过得读写功能,还具有随机访问文件的功能。与Buffer的position方法类似,使用position方法可以指定在文件中进行下一次读写操作的位置。position初值为0,文件的大小(字节数)可以由size方法得到,若将position设置为超过文件大小且进行写入的话,文件将自动进行扩大,同时在写入位置之前会用字节0进行填充。
FileChannel还具有一个方法truncate,其参数可以指定文件中的一个字节位置,这个位置之后的所有字节都将被删除。

force
force方法可以保证通过该通道写入文件的数据不经过内核缓存而直接写到硬盘中。其boolean参数指定是否将文件的元数据信息一并写入。

close
Channel在使用完毕之后必须调用close方法或使用try-with-resource语句关闭Channel。

4.2 SocketChannel
SocketChannel即是一个连接TCP网络套接字的通道,获得该通道的方式有两种,一种是手动使用open()方法打开一个通道并指定连接的IP与端口号。另外一种是使用ServerSocketChannel,一个新连接到达后会自动创建一个相应的SocketChannel。
默认状态下SocketChannel为阻塞的,也就是说,当使用connect、write与read方法时将会阻塞直到建立连接、写出、读入完毕。可以使用configureBlocking方法,并传入参数false设置Channel为非阻塞的。不过此时如果想判断某个操作是否完成,则要在循环中持续检查了。
一个简单客户端与服务器的实例代码如下:

    public static void main(String[] args) throws IOException {
        SocketChannel channel = SocketChannel.open();
        InetSocketAddress simpleServer = new InetSocketAddress("localhost", 8189); //一个正在运行的简易服务器
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.configureBlocking(false); //设置channel为非阻塞
        channel.connect(simpleServer); //使用通道建立TCP连接

        while(!channel.finishConnect()){ //该方法可以返回是否已经建立了连接
            //剩下的代码是建立连接之后才可以进行的,所以这里必须等待连接完毕
        }

        int length = 0;
        //读取服务器端发送的数据到缓冲区
        while(length == 0){
            length = channel.read(buffer);//由于是非阻塞状态,可能未读完数据就返回,所以读到的字节数为0则需要持续读取
        }
        FirstNIO.printBufByte(buffer, "server"); //一个静态方法,可以打印缓冲区中数据

        buffer.clear();
        buffer.put("BYE\n".getBytes()); //向缓冲区中放置数据
        buffer.flip();
        while(buffer.hasRemaining()){
            channel.write(buffer); //将缓冲区的数据写出到通道中,必须使用循环以保证能够全部写出
        }

        buffer.clear();
        length = 0;
        //再次读取
        while(length == 0){
            length = channel.read(buffer); //循环直到读取到数据
        }
        FirstNIO.printBufByte(buffer, "server"); //读出返回数据
        channel.close();
    }

4.3 ServerSocketChannel
ServerSocketChannel是一个可以监听新连接信道的Channel,将该通道与监听端口绑定可以为每一个新连接分配一个SocketChannel,然后使用该通道与之通信。
同样的ServerSocketChannel默认是阻塞的,使用accept方法可以阻塞等待连接进入。使用configureBlocking(false)将ServerSocketChannel设置为非阻塞的,这时accept会立即返回一个SocketChannel,使用时需要先判断该SocketChannel是否不为null。
一个使用ServerSocketChannel作为服务器端和SocketChannel作为客户端的简单通信示例代码如下:

public class SocketChannelServer { //服务器端
    public static void main(String[] args) throws IOException {
        System.out.println("这里是服务器端");
        ServerSocketChannel server = ServerSocketChannel.open(); //监听信道
        InetSocketAddress inetSocketAddress = new InetSocketAddress(8189); //指定端口号
        server.bind(inetSocketAddress); //绑定端口号
        server.configureBlocking(false); //设为非阻塞
        System.out.println("服务器启动成功,正在等待新连接...");
        while (true) {
            SocketChannel socketChannel = null;
            while ((socketChannel = server.accept()) == null) {
                //连接为空则等待
            }
            System.out.println("连接成功");
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put("Hello, this is the server. You have connected successfully!\n".getBytes());
            buffer.put("If you want to disconnect, send BYE".getBytes());
            buffer.flip(); //向客户端发送消息
            socketChannel.write(buffer);
            buffer.clear();

            socketChannel.read(buffer); //接收客户端的消息
            String message = FirstNIO.printBufByte(buffer, "client");//该自定义方法将返回buffer中数据组成的字符串,并且打印出来
            while (!"BYE".equals(message)) {
                buffer.clear();
                socketChannel.read(buffer);
                FirstNIO.printBufByte(buffer, "client");
                buffer.rewind();
                buffer.flip();
                socketChannel.write(buffer); //这个服务端会将客户端的所有消息再发送回去
            }
            buffer.clear();
            buffer.put("BYE~".getBytes());
            buffer.flip();
            socketChannel.write(buffer);
            socketChannel.close();
            System.out.println("连接断开");
        }
    }
}

public class SocketChannelClient { //客户端
    public static void main(String[] args) throws IOException {
        System.out.println("这里是客户端");
        SocketChannel channel = SocketChannel.open();
        InetSocketAddress simpleServer = new InetSocketAddress("localhost", 8189); //服务器IP与端口
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.configureBlocking(false); //设置channel为非阻塞
        channel.connect(simpleServer); //使用通道建立TCP连接

        while(!channel.finishConnect()){ //该方法可以返回是否已经建立了连接
            //剩下的代码是建立连接之后才可以进行的,所以这里必须等待连接完毕
        }

        int length = 0;
        //读取服务器端发送的数据到缓冲区
        while(length == 0){
            length = channel.read(buffer);//由于是非阻塞状态,可能未读完数据就返回,所以读到的字节数为0则需要持续读取
        }
        FirstNIO.printBufByte(buffer, "server"); //一个静态方法,可以打印缓冲区中数据

        buffer.clear();
        buffer.put("BYE".getBytes()); //向缓冲区中放置数据
        buffer.flip();
        while(buffer.hasRemaining()){
            channel.write(buffer); //将缓冲区的数据写出到通道中,必须使用循环以保证能够全部写出
        }

        buffer.clear();
        length = 0;
        //再次读取
        while(length == 0){
            length = channel.read(buffer); //循环直到读取到数据
        }
        FirstNIO.printBufByte(buffer, "server"); //读出返回数据
        channel.close();
    }
}


4.4 DatagramChannel
DatagramChannel使用UDP传输数据,由于传输的数据时是数据包而不是流,所以不能多次接收数据。在一次接收数据时,若缓冲区长度不够则会导致其它数据丢失。同时由于UDP无连接,所以没有一个提供检查连接功能的Channel,DatagramChannel既可以监听端口又可以作为客户端发送数据。最后UDP也可以指定成非阻塞的。示例代码如下:

    public static void main(String[] args) throws IOException {
        //这里是服务器端,负责监听端口
        DatagramChannel server = DatagramChannel.open(); //打开一个DatagramChannel
        server.bind(new InetSocketAddress(8191)); //Java 7的方式绑定一个监听端口
//        server.socket().bind(new InetSocketAddress(8191)); //Java 1.4的方式
        ByteBuffer buffer = ByteBuffer.allocate(18);
        while (buffer.position() == 0){
            server.receive(buffer); //接收消息
            FirstNIO.printBufByte(buffer, "client1");
            buffer.clear();
        }
        server.close();
    }

    public static void main(String[] args) throws IOException {
        //这里是客户端
        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 8191);

        DatagramChannel client = DatagramChannel.open(); //打开一个DatagramChannel
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("hello, I am client".getBytes());
        buffer.flip();
        client.send(buffer, serverAddress); //发送消息的方法1:使用send方法指定IP与端口号,将buffer数据写出

        client.connect(serverAddress); //发送消息的方法2:先使用connect方法指定IP与端口号,与TCP不同,UDP无连接,此方法只是为channel指示发送数据时的网络地址而已
        buffer.clear();
        buffer.put("I like you very very very very very much".getBytes()); //数据长度大于接收方缓冲区大小,超出数据将会被截断
        buffer.flip();
        client.write(buffer); //在使用write方法就可以像其它流一样正常写出缓冲区数据
    }

5.Selector

在Java NIO中,Selector对多个Socket状态的跟踪即是对Channel的跟踪。想要完成这一步需要将Channel对象注册到Selector上,然后标记希望跟踪的事件。
一般使用Selector的流程如下:
1.使用open方法打开一个Selector
2.向Selector使用register方法注册Channel,并指定感兴趣的事件
3.循环检查Selector中是否有Channel产生了感兴趣的事件
4.若有则对其按相应的兴趣事件进行处理

只能对Selector注册非阻塞Channel,因为FileChannel是阻塞的,所以不能将其注册到Selector上。通过检查是否实现SelectableChannel接口可以判断Channels是否可以非阻塞。
5.1 构建选择器与注册通道

Selector selector = Selector.open();
SelectionKey register = server.register(selector, 一个兴趣或多个兴趣通过位或运算返回的集合);

5.2 兴趣集合与SelectionKey
兴趣集合中包含四种兴趣,都是SelectionKey类的常量,兴趣集合实际上是一个位掩码,所以你可以通过位元素对其操作

其中:
OP_ACCEPT 代表对应Channel的接收就绪,可用于SocketServerChannel,意为当前Channel准备好接收其它连接
OP_READ 代表对应Channel的读就绪,可用于SocketChannel,意为当前Channel有数据可读
OP_WRITE 代表对应Channel的写就绪,可用于SocketChannel,意为当前Channel可以写出数据
OP_CONNECT 代表对应Channel的连接就绪,可用于SocketChannel,意为当前Channel完成连接
若注册的通道需要对多个时间感兴趣,则使用位或运算返回一个集合:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_CONNECT;

注册完毕后会返回一个SelectionKey对象,可以通过这个对象得到注册信息和当前通道的状态。

                SelectionKey key = server.register(selector, SelectionKey.OP_ACCEPT);

        /**
         * 返回一个对象,该对象可以在注册时指定server.register(selector, SelectionKey.OP_ACCEPT, new int[10]);
         * 或使用SelectionKey对象的attach方法附上
         */
        Object someObj = key.attachment(); //可以使用附上的对象更方便的识别通道或进行相关操作
        SelectableChannel channel = key.channel();//返回通道
        Selector selector1 = key.selector();//返回选择器
        int interSet = key.interestOps(); //返回注册时的兴趣集合
        int readyInterSet = key.readyOps(); //返回注册时的兴趣集合中现在已经就绪的兴趣组成的集合
        System.out.println((readyInterSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT); //对于已经就绪的兴趣集合可以通过位与运算检查某个兴趣是否满足
        System.out.println(key.isAcceptable()); //还可以通过调用isXxx()方法返回的布尔值判断 Xxx可以为Acceptable、Connectable、Readable或Writable分别对应四种兴趣
        key.cancel(); //使用此方法将该key从所有Selector中清除掉:调用时会将key放入Selector的cancelled-key set中,下次进行选择时进行清除
        key.isValid(); //验证key是否已经被取消

5.3 使用Selector得到通道
Selector维护三个集合:
Registered key set:通过keys()方法返回所有注册且未被cancel清除到Cancelled key set的通道key集合。
Selected key set:通过selectedKeys()方法返回所有满足注册时填入兴趣的集合,由于这个集合不会自动移除通道,所以每次处理完该集合中的通道时必须使用迭代器手动移除,防止重复操作。
Cancelled key set:所有被cancel清除到的通道key集合。

使用Selector的目的就是跟踪注册通道的状态。当一些通道满足其注册时填写的兴趣后,我们应该可以得到它,下面是相应功能的代码:

        int nums = selector.select(); //轮询,返回距离上次调用select又有多少通道就绪,该方法阻塞到至少有一个通道就绪
        selector.select(1000); //增加了阻塞时间,最长阻塞参数时间
        selector.selectNow(); //立即返回当前就绪通道的数量,没有则返回0

        Set selectionKeys = selector.selectedKeys(); //返回当前就绪的通道键集

5.4 停止Selector的阻塞
wakeUp():该方法使之后第一个阻塞的select操作立即返回
close():该方法关闭Selector并使所有阻塞的select操作立即返回,但不会影响注册的Channel本身

5.5 使用场景
NIO主要适用于作为服务器使用多路复用技术来处理多个网络Socket I/O的场景。因此,其主要进行的处理过程为:
1.对ServerSocketChannel关心其有无新连接到达
2.若有新连接到达则使用一个ServerSocket建立连接,对于连接可以向其写入相应数据
3.对于已建立的连接,需要关心其有无数据传来
下面为使用Selector构建的服务器示例代码:

//服务器端

public class NIOServer {
    public static void main(String[] args) {
        InetSocketAddress address = new InetSocketAddress(8989); //端口号
        ByteBuffer write2Client = ByteBuffer.allocate(1024);
        ByteBuffer read2Server = ByteBuffer.allocate(1024);
        try {
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(address);
            Selector selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器启动成功,等待连接...");
            while(true){
                int nums = selector.select(); //若有返回说明有通道就绪
                Set selectionKeys = selector.selectedKeys(); //就绪通道
                Iterator iterator = selectionKeys.iterator(); //迭代器
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if(!key.isValid()) continue; //验证key是否有效
                    if(key.isAcceptable()){ //处理可进行连接通道-建立连接并注册这个通道
                        System.out.println("建立连接");
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = channel.accept(); //建立连接通道
                        socketChannel.configureBlocking(false);
                        write2Client.put("Hello, this is the server. You have connected successfully!\n".getBytes());
                        write2Client.put("If you want to disconnect, send BYE".getBytes());
                        write2Client.flip();
                        while (write2Client.hasRemaining()){
                            socketChannel.write(write2Client); //写出初始信息
                        }
                        write2Client.clear();
                        socketChannel.register(selector, SelectionKey.OP_READ); //将其注册到Selector,并准备读出其数据,所以标记为对读感兴趣
                    }else if(key.isWritable()){ //处理可写通道-回复消息
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        write2Client.put("message received".getBytes());
                        write2Client.flip();
                        while(write2Client.hasRemaining()){
                            socketChannel.write(write2Client); //写出数据到客户端
                        }
                        write2Client.clear();
                        key.interestOps(SelectionKey.OP_READ);
                    }else if(key.isReadable()){ //处理可读通道-读入客户端消息并打印
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        try {
                            socketChannel.read(read2Server);
                            read2Server.flip();
                            String message = FirstNIO.printBufByte(read2Server, "client");//读入客户端信息
                            read2Server.clear();
                            if("BYE".equals(message)){
                                key.cancel();
                                socketChannel.close();
                            }else{
                                key.interestOps(SelectionKey.OP_WRITE); //使其对写关心,准备回复其消息
                            }
                        }catch (IOException e){
                            System.out.println("连接已经断开");
                            key.cancel();
                            socketChannel.close();
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
//客户端
public class SocketChannelClient {
    public static void main(String[] args) throws IOException {
        System.out.println("这里是客户端");
        SocketChannel channel = SocketChannel.open();
        InetSocketAddress simpleServer = new InetSocketAddress("localhost", 8989); //服务器IP与端口
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.configureBlocking(false); //设置channel为非阻塞
        channel.connect(simpleServer); //使用通道建立TCP连接
        while(!channel.finishConnect()){ //该方法可以返回是否已经建立了连接
            //剩下的代码是建立连接之后才可以进行的,所以这里必须等待连接完毕
        }
        while (true){
            try{
                FirstNIO.receiveMessage(buffer, channel, "server"); //接收服务器消息并打印
                Scanner in = new Scanner(System.in);
                String message = in.nextLine();
                FirstNIO.sendMessage(message, buffer, channel); //接收控制台消息并发送给服务器
                if("BYE".equals(message)){
                    break;
                }
            }catch (IOException e){
                System.out.println("主机断开连接");
                channel.close();
                break;
            }
        }
}

参考
Java NIO系列教程(六) Selector