网络io


1.连接建立流程

利用socketServer创建服务端连接,使用tcp协议

public class SocketIoProperties {
    /**
     * server socket listen properties
     */
    //server socket listen property:
    private static final int RECEIVE_BUFFER = 10;
    private static final int SO_TIMEOUT = 0;
    private static final boolean REUSE_ADDR = false;
    private static final int BACK_LOG = 2;
    //client socket listen property on server endpoint:
    private static final boolean CLI_KEEPALIVE = false;
    private static final boolean CLI_OOB = false;
    private static final int CLI_REC_BUF = 20;
    private static final boolean CLI_REUSE_ADDR = false;
    private static final int CLI_SEND_BUF = 20;
    private static final boolean CLI_LINGER = true;
    private static final int CLI_LINGER_N = 0;
    private static final int CLI_TIMEOUT = 0;
    private static final boolean CLI_NO_DELAY = false;

    public static void main(String[] args) {
        ServerSocket socket = null;
        try {
            socket = new ServerSocket();
            socket.bind(new InetSocketAddress(9090), BACK_LOG);
            socket.setReceiveBufferSize(RECEIVE_BUFFER);
            socket.setReuseAddress(REUSE_ADDR);
            socket.setSoTimeout(SO_TIMEOUT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("server up use 9090");
        try {
            while (true) {
                System.in.read();
                Socket client = socket.accept();
                System.out.println("client port: " + client.getPort());
                client.setKeepAlive(CLI_KEEPALIVE);
                client.setOOBInline(CLI_OOB);
                client.setReceiveBufferSize(CLI_REC_BUF);
                client.setReuseAddress(CLI_REUSE_ADDR);
                client.setSendBufferSize(CLI_SEND_BUF);
                client.setSoLinger(CLI_LINGER, CLI_LINGER_N);
                client.setSoTimeout(CLI_TIMEOUT);
                client.setTcpNoDelay(CLI_NO_DELAY);

                new Thread(
                        () -> {
                            try {
                                InputStream in = client.getInputStream();
                                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                                char[] data = new char[1024];
                                while (true) {
                                    int num = reader.read(data);
                                    if (num > 0) {
                                        System.out.println("client read some data is :" + num + " val :" + new String(data, 0, num));
                                    } else if (num == 0) {
                                        System.out.println("client readed nothing!");
                                        continue;
                                    } else {
                                        System.out.println("client readed -1...");
                                        System.in.read();
                                        client.close();
                                        break;
                                    }
                                }

                            } catch (IOException e) {
                                e.printStackTrace();
                            }

                        }
                ).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

使用socket创建客户端连接:

public class SocketClient {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("", 9090);
            OutputStream outputStream = socket.getOutputStream();
            socket.setSendBufferSize(20);
            socket.setOOBInline(false);
            socket.setTcpNoDelay(true);
            BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(System.in));
            while(true){
                String s = bufferedReader.readLine();
                if(!Optional.ofNullable(s).isPresent()){
                    byte[] by=s.getBytes();
                    for(byte bb:by){
                        outputStream.write(bb);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

1.使用 netstat -natp  监控tcp端口占用:

Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      1175/sshd           
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      1720/master         
tcp        0      0 192.168.72.181:22       192.168.72.1:56913      ESTABLISHED 2618/sshd: root@pts 
tcp        0      0 192.168.72.181:22       192.168.72.1:56423      ESTABLISHED 2406/sshd: root@pts 
tcp        0      0 192.168.72.181:22       192.168.72.1:56455      ESTABLISHED 2508/sshd: root@pts 
tcp6       0      0 192.168.72.181:3888     :::*                    LISTEN      1288/java           
tcp6       0      0 :::22                   :::*                    LISTEN      1175/sshd           
tcp6       0      0 ::1:25                  :::*                    LISTEN      1720/master         
tcp6       0      0 :::2181                 :::*                    LISTEN      1288/java           
tcp6       0      0 :::38055                :::*                    LISTEN      1288/java           
tcp6       0      0 192.168.72.181:3888     192.168.72.182:35717    ESTABLISHED 1288/java           
tcp6       0      0 192.168.72.181:60022    192.168.72.183:2888     ESTABLISHED 1288/java           
tcp6       0      0 192.168.72.181:3888     192.168.72.183:37578    ESTABLISHED 1288/java 

2.使用tcpdump 监听9090端口并抓取包,其中eno16777736为当前主机网卡名称

[root@hadoop01 ~]# tcpdump -nn -i eno16777736 port 9090
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on eno16777736, link-type EN10MB (Ethernet), capture size 262144 bytes

3.将两个文件上传linux,然后启动服务端:

 javac ./SocketIoProperties.java && java SocketIoProperties

当程序正常启动之后:输出如下,程序正常,因为运行到System.in.read了,程序暂时处于阻塞状态。

[root@hadoop01 socket]# java SocketIoProperties
server up use 9090

首先tcp端口监听增加了一个监听进程,端口号为9090,pid为2644

 此时tcpdump抓包并没有抓到任何东西,因为还没有任何客户端和其交互

 使用lsof命令来查看文件描述符,linux提供了两种帮助命令  --help 和man ,其中--help表示查看命令有哪些参数可以使用,man 则是直接查看该命令时做什么的,比如

 man lsof :表示list 出打开的文件

 lsof  -p 2641 查看该进程的文件描述符,可看出最后一行中有个5u的文件描述符指向了监听接口

COMMAND  PID USER   FD   TYPE             DEVICE  SIZE/OFF      NODE NAME
java    2641 root  cwd    DIR              253,0      4096  37750610 /root/io/socket
java    2641 root  rtd    DIR              253,0      4096       128 /
java    2641 root  txt    REG              253,0      8712 103858189 /opt/software/jdk1.8.0_281/bin/java
java    2641 root  mem    REG              253,0 106065056  67165411 /usr/lib/locale/locale-archive
java    2641 root  mem    REG              253,0    113008  33705275 /opt/software/jdk1.8.0_281/jre/lib/amd64/libnet.so
java    2641 root  mem    REG              253,0  67063338    868659 /opt/software/jdk1.8.0_281/jre/lib/rt.jar
java    2641 root  mem    REG              253,0    127016  33705273 /opt/software/jdk1.8.0_281/jre/lib/amd64/libzip.so
java    2641 root  mem    REG              253,0     57824  67165382 /usr/lib64/libnss_files-2.17.so
java    2641 root  mem    REG              253,0    231960  33705272 /opt/software/jdk1.8.0_281/jre/lib/amd64/libjava.so
java    2641 root  mem    REG              253,0     66112  33705123 /opt/software/jdk1.8.0_281/jre/lib/amd64/libverify.so
java    2641 root  mem    REG              253,0     44088  67165394 /usr/lib64/librt-2.17.so
java    2641 root  mem    REG              253,0   1141552  67165372 /usr/lib64/libm-2.17.so
java    2641 root  mem    REG              253,0  17105112  71390141 /opt/software/jdk1.8.0_281/jre/lib/amd64/server/libjvm.so
java    2641 root  mem    REG              253,0   2107816  67165364 /usr/lib64/libc-2.17.so
java    2641 root  mem    REG              253,0     19512  67165370 /usr/lib64/libdl-2.17.so
java    2641 root  mem    REG              253,0    109416 100838985 /opt/software/jdk1.8.0_281/lib/amd64/jli/libjli.so
java    2641 root  mem    REG              253,0    142296  67165390 /usr/lib64/libpthread-2.17.so
java    2641 root  mem    REG              253,0    164432  67165357 /usr/lib64/ld-2.17.so
java    2641 root  mem    REG              253,0     32768  35410124 /tmp/hsperfdata_root/2641
java    2641 root    0u   CHR              136,0       0t0         3 /dev/pts/0
java    2641 root    1u   CHR              136,0       0t0         3 /dev/pts/0
java    2641 root    2u   CHR              136,0       0t0         3 /dev/pts/0
java    2641 root    3r   REG              253,0  67063338    868659 /opt/software/jdk1.8.0_281/jre/lib/rt.jar
java    2641 root    4u  unix 0xffff8804231df800       0t0     20177 socket
java    2641 root    5u  IPv6              20179       0t0       TCP *:websm (LISTEN)

此时,在别的节点【192.168.72.183】启动一个客户端:java SocketClient

查看tcpdump抓包命令,发现三次握手,首先客户端ip向服务端发送消息,服务端向客户端发送消息ack,客户端收到之后向服务端继续发送ack消息

tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on eno16777736, link-type EN10MB (Ethernet), capture size 262144 bytes
20:21:34.530881 IP 192.168.72.183.42601 > 192.168.72.181.9090: Flags [S], seq 3452114734, win 29200, options [mss 1460,sackOK,TS val 3805578 ecr 0,nop,wscale 7], length 0
20:21:34.530918 IP 192.168.72.181.9090 > 192.168.72.183.42601: Flags [S.], seq 3488810473, ack 3452114735, win 1152, options [mss 1460,sackOK,TS val 3832062 ecr 3805578,nop,wscale 0], length 0
20:21:34.531147 IP 192.168.72.183.42601 > 192.168.72.181.9090: Flags [.], ack 1, win 229, options [nop,nop,TS val 3805579 ecr 3832062], length 0

再看之前netstat监听的9090端口,发现内核中增加了一条建立连接的信息,但是并没有本机上那个进程去对接它

 截止当前,双方已经建立了连接,但是服务端并没有进程去对接客户端过来的消息,当服务端有对接之后,就会多一个文件描述符

客户端发送消息,发现抓包中有信息出现,三次握手,然后netstat -natp查看端口发现并没有什么变化,但是,recv-Q却有数字变化,比如客户端发送了10个字节,数字将变成10

由此可见:客户端服务端建立连接对于服务端来讲,其实就是在内核中开辟一段资源,当连接建立之后,发送过来的数据就放在内核的该资源当中。

回车服务端,服务端accept方法执行

这时候,查看监听端口,发现如下:表示已经有一个java进程接受监听的端口所发送过来的信息

tcp6       0      0 192.168.72.181:9090     192.168.72.183:42601    ESTABLISHED 2641/java

再查看文件描述符:

[root@hadoop01 socket]# lsof -p 2641
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 2641 root cwd DIR 253,0 4096 37750610 /root/io/socket
java 2641 root rtd DIR 253,0 4096 128 /
java 2641 root txt REG 253,0 8712 103858189 /opt/software/jdk1.8.0_281/bin/java
java 2641 root mem REG 253,0 106065056 67165411 /usr/lib/locale/locale-archive
java 2641 root mem REG 253,0 113008 33705275 /opt/software/jdk1.8.0_281/jre/lib/amd64/libnet.so
java 2641 root mem REG 253,0 67063338 868659 /opt/software/jdk1.8.0_281/jre/lib/rt.jar
java 2641 root mem REG 253,0 127016 33705273 /opt/software/jdk1.8.0_281/jre/lib/amd64/libzip.so
java 2641 root mem REG 253,0 57824 67165382 /usr/lib64/libnss_files-2.17.so
java 2641 root mem REG 253,0 231960 33705272 /opt/software/jdk1.8.0_281/jre/lib/amd64/libjava.so
java 2641 root mem REG 253,0 66112 33705123 /opt/software/jdk1.8.0_281/jre/lib/amd64/libverify.so
java 2641 root mem REG 253,0 44088 67165394 /usr/lib64/librt-2.17.so
java 2641 root mem REG 253,0 1141552 67165372 /usr/lib64/libm-2.17.so
java 2641 root mem REG 253,0 17105112 71390141 /opt/software/jdk1.8.0_281/jre/lib/amd64/server/libjvm.so
java 2641 root mem REG 253,0 2107816 67165364 /usr/lib64/libc-2.17.so
java 2641 root mem REG 253,0 19512 67165370 /usr/lib64/libdl-2.17.so
java 2641 root mem REG 253,0 109416 100838985 /opt/software/jdk1.8.0_281/lib/amd64/jli/libjli.so
java 2641 root mem REG 253,0 142296 67165390 /usr/lib64/libpthread-2.17.so
java 2641 root mem REG 253,0 164432 67165357 /usr/lib64/ld-2.17.so
java 2641 root mem REG 253,0 32768 35410124 /tmp/hsperfdata_root/2641
java 2641 root 0u CHR 136,0 0t0 3 /dev/pts/0
java 2641 root 1u CHR 136,0 0t0 3 /dev/pts/0
java 2641 root 2u CHR 136,0 0t0 3 /dev/pts/0
java 2641 root 3r REG 253,0 67063338 868659 /opt/software/jdk1.8.0_281/jre/lib/rt.jar
java 2641 root 4u unix 0xffff8804231df800 0t0 20177 socket
java 2641 root 5u IPv6 20179 0t0 TCP *:websm (LISTEN)
java 2641 root 6u IPv6 24722 0t0 TCP hadoop01:websm->hadoop03:42601 (ESTABLISHED)

表示多了一个文件描述符,也就是说当服务端调用accept方法之后,服务端程序才接收客户端发送过来的消息,而接收消息是通过调用内核accept方法,内核打开了文件,返回一个文件描述符,该文件描述符表示服务端通信。

 整个通信流程如下:

2.服务端参数

      backlog:当backlog设置为2时,服务端最多可以建立三个连接,当有新的链接进来时,三次握手,第一次访问服务端,但是服务端不会搭理它或者发送返回失败,也就是三次握手不会进行完成,相当于放弃了该连接

      netstat查看结果显示:listen:监听    established:建立连接    syn_recy:三次连接建立失败,放弃本次连接

   

    timeout:时间超时,调用accept之后,如果时间超时,会发生异常,但并不影响整个程序运行

    SendBufferSize:发送的缓存大小,开启优化之后其实包的大小时可以大于该值的
    setOOBInline:首字母是否提前发送
    setTcpNoDelay:是否优化,比如设置为false,表示优化,如果数据比较大,会一批批发送,如果不优化,会尽快发送数据

    setKeepAlive:如果开启,定时会发送消息确认连接是否正常,如果不正常,就除掉

3.三次握手发送信息

 包的大小:ifconfig查看mtu值,该值为1500,表示包的总大小,上图中option中的mss表示除去ip,port等之外的数据大小

 窗口:窗口是客户端和服务端协商的结果,每次发包都会汇报窗口大小,比如当服务端的窗口比较大时,客户端可以发送多个包过去,当服务端的窗口满了,客户端阻塞,不再发送数据,当服务端处理一部分之后,客户端才继续发(UC控制)

第一次

第一次握手:建立连接时,客户端发送syn包(seq=j)到服务器,并进入SYN_SENT状态,等待服务器确认;SYN:同步序列编号(Synchronize Sequence Numbers)。 

第二次

第二次握手:服务器收到syn包,必须确认客户端的SYN(ack=j+1),同时自己也发送一个SYN包(seq=k),即SYN+ACK包,此时服务器进入SYN_RECV状态。

第三次

第三次握手:客户端收到服务器的SYN+ACK包,向服务器发送确认包ACK(ack=k+1),此包发送完毕,客户端和服务器进入ESTABLISHED(TCP连接成功)状态,完成三次握手。

4.程序建立监听步骤

java 2641 root 5u IPv6 20179 0t0 TCP *:websm (LISTEN)

 在linux内核的具体实现:

   1.创建文件描述符 socket=fd5

        2.绑定bind(fd5,20179)

        3.listen(fd5)

 

5.BIO和NIO的区别

  上述代码就是普通的BIO,在while(true)循环中接收连接,然后每次接收到一个连接之后,创建一个多线程,在多线程中循环获取该连接的数据,这样做的目的是因为该代码中有两个地方阻塞,一个时accept()方法,一个时read()方法,假设只有一个,比如accpet(),只要放在一个while(true)循环中就行了,但是有了read(),当read()阻塞之后,accept()方法无法执行,也就无法创建文件标识符,代码无法向后续走,所以只能使用多线程处理

       即便这样,还是有很大缺点:c10k问题,每有一个连接,就需要clone一个线程,需要一次系统调用,当10k个连接时,就需要创建1万个线程,消耗资源严重

       

       由此可见 :BIO效率低下是因为服务端对于每个连接需要增加一个系统调用clone线程,而造成这种情况的根本原因是kernel的提供的api中,accept()方法和read()方法都是阻塞的。

       一个普通的建立连接的clinet端代码:

public static void main(String[] args) {
        LinkedList clients = new LinkedList<>();
        InetSocketAddress serverAddr = new InetSocketAddress("192.168.150.11", 9090);

        //端口号的问题:65535
        //  windows
        for (int i = 10000; i < 65000; i++) {
            try {
                SocketChannel client1 = SocketChannel.open();

                SocketChannel client2 = SocketChannel.open();

                /*
                linux中你看到的连接就是:
                client...port: 10508
                client...port: 10508
                 */

                client1.bind(new InetSocketAddress("192.168.150.1", i));
                //  192.168.150.1:10000   192.168.150.11:9090
                client1.connect(serverAddr);
                clients.add(client1);

                client2.bind(new InetSocketAddress("192.168.110.100", i));
                //  192.168.110.100:10000  192.168.150.11:9090
                client2.connect(serverAddr);
                clients.add(client2);

            } catch (IOException e) {
                e.printStackTrace();
            }


        }
        System.out.println("clients "+ clients.size());

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    当使用这种方式连接服务端时,发现192.168.110.100并没有建立连接,原因是当第一次握手成功之后,第二次握手时,发现由于客户端和服务端不是同一网段,所以经过路由跳的时候起始点变成了linux服务器的网卡地址,这样客户端收到信息之后,没法处理(和自己发送的目标地址不一致)

 处理方法:查看路由表,发现默认

添加网关:

 使用NIO解决c10k问题:

linux提供了新的api,NIO中的n既有新io的意思,又有no block的意思,代码实现如下,在同一个线程中:

public static void main(String[] args) throws Exception {
        LinkedList clients = new LinkedList<>();
        ServerSocketChannel ss = ServerSocketChannel.open();  //服务端开启监听:接受客户端
        ss.bind(new InetSocketAddress(9090));
        ss.configureBlocking(false); //重点  OS  NONBLOCKING!!!  //只让接受客户端  不阻塞
         while (true) {
            //接受客户端的连接
            Thread.sleep(1000);
            SocketChannel client = ss.accept(); //不会阻塞?  -1 NULL
            //accept  调用内核了:1,没有客户端连接进来,返回值?在BIO 的时候一直卡着,但是在NIO ,不卡着,返回-1,NULL
            //如果来客户端的连接,accept 返回的是这个客户端的fd  5,client  object
            //NONBLOCKING 就是代码能往下走了,只不过有不同的情况
            if (client == null) {
             //   System.out.println("null.....");
            } else {
                client.configureBlocking(false); //重点  socket(服务端的listen socket<连接请求三次握手后,往我这里扔,我去通过accept 得到  连接的socket>,连接socket<连接后的数据读写使用的> )
                int port = client.socket().getPort();
                System.out.println("client..port: " + port);
                clients.add(client);
            }
            ByteBuffer buffer = ByteBuffer.allocateDirect(4096);  //可以在堆里   堆外
            //遍历已经链接进来的客户端能不能读写数据
            for (SocketChannel c : clients) {   //串行化!!!!  多线程!!
                int num = c.read(buffer);  // >0  -1  0   //不会阻塞
                if (num > 0) {
                    buffer.flip();
                    byte[] aaa = new byte[buffer.limit()];
                    buffer.get(aaa);
                    String b = new String(aaa);
                    System.out.println(c.socket().getPort() + " : " + b);
                    buffer.clear();
                }
            }
        }
    }

通过设置:configureBlocking(false),表示内核的api,accept()方法不再阻塞,当没有连接时,返回-1,通过这种方式,提高了网络连接效率,但是还是有缺点就是如果一次进来多个连接,每次循环还是只能一个一个便利拿取连接

另外:持续创建文件描述符有可能超出文件描述符的最大值,第一幅中为默认值,os文件描述符的最大值

 

 文件描述符的大小和内存有关

 NIO的优势和缺点:

6.多路复用器的引入

 当我们使用NIO时,虽然它有个有个优势就是使用一个线程或者多个线程处理多个io连接,但是同样它也有问题,同样都是c10k问题,就是当连接达到10K时,每次accept一个线程,就需要便利所有连接,看看有哪些线程有哪些连接有数据返回,但事实上,这一万个连接中其实只有几个连接有数据返回,而且每次便利这一万个连接,都需要系统调用去处理,为了解决这样的问题,引出了多路复用器,多路复用器指的是内核提供了一套api,当一次系统调用将所有的文件描述符传入内核的select方法,该方法返回连接状态,这样程序只要访问那些有状态的连接获取数据就行了

具体流程如下:

setect多路复用器,其实就是内核提供的方法,查看该方法的内容可以使用命令:man 2 select 该命令中的2解释如下:

1、Standard commands (标准命令)
2、System calls (系统调用)
3、Library functions (库函数)
4、Special devices (设备说明)
5、File formats (文件格式)
6、Games and toys (游戏和娱乐)
7、Miscellaneous (杂项)
8、Administrative Commands (管理员命令)
9 其他(Linux特定的), 用来存放内核例行程序的文档。

man 2 select 可以查看到如下等信息:

SELECT(2)                                                                              Linux Programmer's Manual                                                                              SELECT(2)

NAME
       select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing

SYNOPSIS
       /* According to POSIX.1-2001 */
       #include select.h>

       /* According to earlier standards */
       #include 
       #include 
       #include 

       int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct timeval *timeout);

       void FD_CLR(int fd, fd_set *set);
       int  FD_ISSET(int fd, fd_set *set);
       void FD_SET(int fd, fd_set *set);
       void FD_ZERO(fd_set *set);

       #include select.h>

       int pselect(int nfds, fd_set *readfds, fd_set *writefds,
                   fd_set *exceptfds, const struct timespec *timeout,
                   const sigset_t *sigmask);

   Feature Test Macro Requirements for glibc (see feature_test_macros(7)):

       pselect(): _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600

DESCRIPTION
       select()  and pselect() allow a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input pos‐
       sible).  A file descriptor is considered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking.
。。。。。。。。。。。。。。。。。。

 select方法的部分理解:

 7.同步,异步,阻塞,非阻塞

        阻塞和非阻塞是对于内核的方法比如accept和read来说的,同步和异步指的是内核有没有自己读取数据,并将数据放入内核buffer,程序只和buffer交互,不自己去调用内核的read()方法获取数据。

 

 8.poll,select,epoll的区别

         无论是NIO还是select还是poll,都需要便利每个IO,拿取状态,但是NIO的便利成本在系统调用,select 和poll是发生了一次系统调用之后,内核根据用户传过来的fd进行自己便利,返回状态

而这种select和poll 的方式有这样的弊端:1 便利所有传入的fd,2 每次都需要传入一次fd,为了解决这个问题,产生了epoll,epoll是将传入的fd进行保存在内核中的开辟空间中,当网卡中传入数据,cpu发生中断,网卡对应的传入数据的fd在内核中找到之前开辟的空间中的对应的fd,然后将该fd加入链表,这样每次客户端读取数据的时候直接在链表中拿到fd,对拿到的fd进行读取操作,不需要再进行循环遍历

 如上图,左侧为epoll,右侧为select或者poll

 多路复用器使用:

 private ServerSocketChannel server = null;
    private Selector selector = null;   //linux 多路复用器(select poll    epoll kqueue) nginx  event{}
    int port = 9090;

    public void initServer() {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            //如果在epoll模型下,open--》  epoll_create -> fd3
            selector = Selector.open();  //  select  poll  *epoll  优先选择:epoll  但是可以 -D修正
            //server 约等于 listen状态的 fd4
            /**
            register  如果:select,poll:jvm里开辟一个数组 fd4 放进去 epoll:  epoll_ctl(fd3,ADD,fd4,EPOLLIN  */
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        initServer();
        System.out.println("服务器启动了。。。。。");
        try {
            while (true) {  //死循环
                Set keys = selector.keys();
                System.out.println(keys);
                System.out.println(keys.size()+"   size");
                //1,调用多路复用器(select,poll  or  epoll  (epoll_wait))
                /**
                select()是啥意思:
                1,select,poll  其实  内核的select(fd4)  poll(fd4)
                2,epoll:  其实 内核的 epoll_wait() , 参数可以带时间:没有时间,0  :  阻塞,有时间设置一个超时
                selector.wakeup()  结果返回0
                 懒加载:
                其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用*/
                while (selector.select() > 0) {
                    Set selectionKeys = selector.selectedKeys();  //返回的有状态的fd集合
                    Iterator iter = selectionKeys.iterator();
                    /**管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!!
                     NIO  自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了?
                    //我前边可以强调过,socket:  listen   通信 R/W*/
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove(); //set  不移除会重复循环处理
                        if (key.isAcceptable()) {
                            /**看代码的时候,这里是重点,如果要去接受一个新的连接 语义上,accept接受连接且返回新连接的FD对吧?
                            //那新的FD怎么办?
                            //select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起
                            //epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间*/
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);  //连read 还有 write都处理了
                            //在当前线程,这个方法可能会阻塞  ,如果阻塞了十年,其他的IO早就没电了。。。
                            //所以,为什么提出了 IO THREADS
                            //redis  是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的
                            //tomcat 8,9  异步的处理方式  IO  和   处理上  解耦
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端  fd7
            client.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(8192);  //前边讲过了
            /***   你看,调用了register
            select,poll:jvm里开辟一个数组 fd7 放进去
            epoll:  epoll_ctl(fd3,ADD,fd7,EPOLLIN
             */
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客户端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read = 0;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);   //读入的信息从新写回去
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    public static void main(String[] args) {
        SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
        service.start();
    }
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider

  同一个代码,使用不同的方式启动,就会使用不同的多路复用器处理

       代码分析:启动程序,指定使用poll多路复用器运行程序,strace命令是跟踪程序,跟踪程序以及程序运行启动的子程序发生的系统调用

 javac SocketMultiplexingSingleThreadv1.java && strace -ff -o p java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider   SocketMultiplexingSingleThreadv1

      命令执行之后:

    

    服务端启动正常,且查strace命令的跟踪,发现生成 了许多文件,且都以pid结尾

    

    netstat -natp 查看pid状况,发现9090端口被4144程序进行监听

   

   查看文件描述符

   首先用jps 查看当前java的pid,为4144

    losf -p 4144

    

 以上初始状态完成之后,使用nc localhost  9090 连接服务器

 一旦执行,服务端就会有程序连接显示,因为在服务端打印了连接信息

   

从新查看tcp端口信息:

文件描述符

   

   查看系统调用信息:

   vim p.4108   查看4108文件中系统调用(一般选择文件比较大的,一般我们程序发生系统调用会在文件大的里面)

   :set nu  显示文件行数在行的开头

    /9090 查找9090

    

   可以查看到具体的系统调用过程,先建立socket连接,创建监听文件描述符,监听文件描述符,并将监听文件

   

    当程序发生调用时,如上,传入多个文件描述符,返回的是发生变化的文件描述符7,而7在前面发生了写入操作

    pool和epoll系统调用对比如下:

9.网络连接四次分手 

关闭客户端nc连接:

   

   发现客户端和服务端的连接已经结束,但是客户端还未结束,并且状态为TIME_WAIT状态

   原因是客户端和服务端结束连接需要经过四次分手环节:简单来讲,客户端发送消息说自己需要关闭,服务端ack响应,然后服务端发送消息说自己也要和客户端断开连接,然后客户端发送ack响应,所以正常流程是谁先发送关闭消息,谁最后处于的状态就是TIME_WAIT状态,但是这个状态有时间限制,当过了这个时间,整个连接就会结束(如下图下方)。如果服务端代码中没有写clent.close()方法,那么服务端和客户端会同时出现TIME_WAIT状态,如下图上方,客户端发送关闭请求,服务端响应,然后没有然后了,这种情况下会浪费资源,浪费的资源来自于四元组,客户端不挂掉,客户端就永远不能在同一机子上启动同一端口的连接

   

10.多线程处理select

public class SelectorThreadHandler {
    Selector selector;
    ServerSocketChannel serverSocketChannel;
    @Before
    public void init() throws IOException {
        selector=Selector.open();
        serverSocketChannel=ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9999));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }
    @Test
    public void executorTest() throws IOException {
        Set keys= selector.keys();
        System.out.println(String.format("当前有的SelectionKey的个数:%s,他们分别是:%s",keys.size(),keys));
        while (selector.select()>0){
            Set keySet=selector.selectedKeys();
            Iteratoriterator= keySet.iterator();
            SelectionKey select= iterator.next();
            iterator.remove();
            if(select.isAcceptable()){
              /**个人理解:服务这边可以监听多路复用器过来的连接和数据*/
              ServerSocketChannel serverSocketChannel=(ServerSocketChannel)select.channel();
              serverSocketChannel.configureBlocking(false);
              SocketChannel clinetConnect= serverSocketChannel.accept();
              clinetConnect.configureBlocking(false);
              clinetConnect.register(selector,SelectionKey.OP_READ,ByteBuffer.allocateDirect(10));
            }else if(select.isReadable()){ 
                System.out.println("reader");
              new Thread(()->{
                  try {
                      SocketChannel channel= (SocketChannel)select.channel();
                      ByteBuffer byteBuffer=(ByteBuffer)select.attachment();
                      byteBuffer.clear();
                      while(channel.read(byteBuffer)>0){
                          byteBuffer.flip();
                          channel.write(byteBuffer);
                      }
                      channel.configureBlocking(false);
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }).start();
            }else {
                System.out.println("其他");
            }

        }
    }
}

发现一个问题:

     客户端正常输入和输出,但是每次当Thread线程中还在处理的时候reader已经输出了好几遍,网上有使用cancle()方法取消key的,public abstract void cancel()方法的作用是请求取消此键的通道到其选择器的注册。一旦返回,该键就是无效的,并且将被添加到其选择器的已取消键集中。在进行下一次选择操作时,将从所有选择器的键集中移除该键。如果已取消了此键,则调用此方法无效。一旦取消某个键,SelectionKey.isValid) 方法返回false。可在任意时间调用cancel()方法。此方法与选择器的已取消键集保持同步,因此,如果通过涉及同一选择器的取消或选择操作并发调用它,则它可能会暂时受阻塞。当使用cancel时,客户端再次输入信息,服务端将不再有收获,所以这种方法并没有什么卵用,还是一个selector一个线程,将读来的数据写回去,正常读写!

public class MultiThread {
    public static void main(String[] args) {
        /**主方法,提供一个selector分组,在初始化该分组时创建n个selector,每个selector绑定一个线程*/
        MyThreadGroup  threadGroup=new MyThreadGroup(3);
        /**提供绑定服务的方法,调用该方法时,将该端口对象的服务注册到selector上*/
        threadGroup.bind(9999);
    }
}
public class MyThreadGroup {
    ThreadSelector[] selector;
    AtomicInteger integer=new AtomicInteger(0);
    public MyThreadGroup(int num) {
        selector=new ThreadSelector[num];
       for(int j=0;j){
           selector[j]= new ThreadSelector(this);
           new Thread(selector[j]).start();
       }
    }

    public void bind(int i) {
        try {
            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(i));
            nextSelector(serverSocketChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void nextSelector(Channel channel) {
        if(channel instanceof ServerSocketChannel){
            /**当是服务时,使用第一个selector*/
           selector[0].blockingQueue.add(channel);
           selector[0].selector.wakeup();
        }else {
            /**其他的channel放置在除第一个之外的selector上*/
        ThreadSelector threadSelector=next();
        /**将服务注册添加到当前选择之后的Selector中的,然后在该selector中自己进行注册*/
        threadSelector.blockingQueue.add(channel);
        /**如下方式,我们无法保证服务threadSelector中的select方法的阻塞在唤醒之后与我们主线程中的服务的注册的前后*/
        threadSelector.selector.wakeup();
        /*ServerSocketChannel serverSocketChannel=(ServerSocketChannel)channel;
        try {
            System.out.println("如果当前线程阻塞,则唤醒");
            threadSelector.selector.wakeup();*//**如果当前selector,则会唤醒阻塞状态*//*
            System.out.println("开始注册server,当前线程:"+Thread.currentThread().getName());
            serverSocketChannel.register(threadSelector.selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }*/

        }
    }

    /**轮训获取一个selector所在的对象*/
    private ThreadSelector next() {
        /***注:首次使用integer.getAndIncrement()获取的最终值就是定义时的默认值为0,后期逐渐加一*/
        int index=integer.getAndIncrement()%(selector.length-1);
        return selector[index+1];

    }
}
public class ThreadSelector implements Runnable{
   Selector selector=null;
   MyThreadGroup group=null;
   LinkedBlockingQueue blockingQueue=new LinkedBlockingQueue<>();
   public ThreadSelector(MyThreadGroup group){
       try {
           this.group=group;
           this.selector=Selector.open();
       } catch (IOException e) {
           e.printStackTrace();
       }
   }
   public void run() {
        while (true){
            try {
              //  System.out.println(Thread.currentThread().getName()+"运行一个selector,检测状态改变之前,当前keys为:"+selector.keys().size());
                int num=selector.select();
             //   System.out.println(Thread.currentThread().getName()+"检测状态改变之后:"+selector.keys().size());
                if (num>0){
                    Set keys= selector.selectedKeys();
                    Iteratoriterator=keys.iterator();
                    while(iterator.hasNext()){
                        SelectionKey key= iterator.next();
                        keys.remove(key);
                        if(key.isAcceptable()){
                            acceptHander(key);
                        }else if(key.isReadable()){
                            readHander(key);
                        }else {

                        }
                    }
                }
                if(!blockingQueue.isEmpty()){
                    Channel channel=  blockingQueue.take();
                    if(channel instanceof ServerSocketChannel){
                        ServerSocketChannel listen=(ServerSocketChannel)channel;
                        System.out.println(Thread.currentThread().getName()+"服务端注册listen");
                        listen.register(this.selector,SelectionKey.OP_ACCEPT);
                    }else if(channel instanceof SocketChannel){
                        SocketChannel socketChannel=(SocketChannel)channel;
                        ByteBuffer byteBuffer=ByteBuffer.allocateDirect(2048);
                        System.out.println(Thread.currentThread().getName()+"当前客户端注册在了该selector上");
                        socketChannel.register(this.selector,SelectionKey.OP_READ,byteBuffer);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void readHander(SelectionKey key) {
       System.out.println(Thread.currentThread().getName()+"开始read数据。。。。。。。");
       try {
            SocketChannel client=(SocketChannel) key.channel();
            client.configureBlocking(false);
            ByteBuffer  byteBuffer=(ByteBuffer)key.attachment();
            byteBuffer.clear();
            while (true){
                 int num= client.read(byteBuffer);
                 if(num>0){
                     byteBuffer.flip();
                     client.write(byteBuffer);
                 }else if(num==0){
                     break;
                 }else if(num<0){
                     System.out.println("client:"+client.getRemoteAddress().toString()+" 已经断开。。。。。");
                     key.cancel();/**当客户端断开之后,状态变化一直存在,除非用cancle剔除*/
                     break;
                 }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void acceptHander(SelectionKey key) {
       try {
            ServerSocketChannel server=(ServerSocketChannel)key.channel();
            SocketChannel client= server.accept();
            client.configureBlocking(false);
            group.nextSelector(client);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 代码微调:

public class MultiThread {
    public static void main(String[] args) {
        /**主方法,提供一个selector分组,在初始化该分组时创建n个selector,每个selector绑定一个线程*/
        MyThreadGroup  listen=new MyThreadGroup(3);
        MyThreadGroup  worker=new MyThreadGroup(3);
        listen.setWorker(worker);
        /**提供绑定服务的方法,调用该方法时,将该端口对象的服务注册到selector上*/
        listen.bind(9999);
        listen.bind(8888);
        listen.bind(7777);
        listen.bind(6666);
    }
}
public class MyThreadGroup {
    ThreadSelector[] selector;
    MyThreadGroup workerGroup=this;
    AtomicInteger integer=new AtomicInteger(0);
    public MyThreadGroup(int num) {
       selector=new ThreadSelector[num];
       for(int j=0;j){
           selector[j]= new ThreadSelector(this);
           new Thread(selector[j]).start();
       }
    }

    public void bind(int i) {
        try {
            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(i));
            nextSelector(serverSocketChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void nextSelector(Channel channel) {
          if(channel instanceof ServerSocketChannel){
              ThreadSelector listenSelector= commonNext();
              listenSelector.blockingQueue.add(channel);
              listenSelector.selector.wakeup();
          }else {
              ThreadSelector workSelector= workerNext();
              workSelector.blockingQueue.add(channel);
              workSelector.selector.wakeup();
          }




        /*if(channel instanceof ServerSocketChannel){
            *//**当是服务时,使用第一个selector*//*
           selector[0].blockingQueue.add(channel);
           selector[0].selector.wakeup();
        }else {
            *//**其他的channel放置在除第一个之外的selector上*//*
        ThreadSelector threadSelector=next();
        *//**将服务注册添加到当前选择之后的Selector中的,然后在该selector中自己进行注册*//*
        threadSelector.blockingQueue.add(channel);
        *//**如下方式,我们无法保证服务threadSelector中的select方法的阻塞在唤醒之后与我们主线程中的服务的注册的前后*//*
        threadSelector.selector.wakeup();*/
        /*ServerSocketChannel serverSocketChannel=(ServerSocketChannel)channel;
        try {
            System.out.println("如果当前线程阻塞,则唤醒");
            threadSelector.selector.wakeup();*//**如果当前selector,则会唤醒阻塞状态*//*
            System.out.println("开始注册server,当前线程:"+Thread.currentThread().getName());
            serverSocketChannel.register(threadSelector.selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }

        }*/
    }

    /**轮训获取一个selector所在的对象*/
    private ThreadSelector next() {
        /***注:首次使用integer.getAndIncrement()获取的最终值就是定义时的默认值为0,后期逐渐加一*/
        int index=integer.getAndIncrement()%(selector.length-1);
        return selector[index+1];

    }

    private ThreadSelector commonNext() {
        /***注:首次使用integer.getAndIncrement()获取的最终值就是定义时的默认值为0,后期逐渐加一*/
        int index=integer.getAndIncrement()%selector.length;
        return selector[index];

    }

    private ThreadSelector workerNext() {
        /***注:首次使用integer.getAndIncrement()获取的最终值就是定义时的默认值为0,后期逐渐加一*/
        int index=integer.getAndIncrement()%this.workerGroup.selector.length;
        return this.workerGroup.selector[index];

    }

    public void setWorker(MyThreadGroup worker) {
          this.workerGroup=worker;
    }
}
public class ThreadSelector extends ThreadLocal> implements Runnable {
   Selector selector=null;
   MyThreadGroup group=null;

    @Override
    protected LinkedBlockingQueue initialValue() {
        return new LinkedBlockingQueue<>();
    }

    LinkedBlockingQueue blockingQueue=get();
   public ThreadSelector(MyThreadGroup group){
       try {
           this.group=group;
           this.selector=Selector.open();
       } catch (IOException e) {
           e.printStackTrace();
       }
   }
   public void run() {
        while (true){
            try {
              //  System.out.println(Thread.currentThread().getName()+"运行一个selector,检测状态改变之前,当前keys为:"+selector.keys().size());
                int num=selector.select();
             //   System.out.println(Thread.currentThread().getName()+"检测状态改变之后:"+selector.keys().size());
                if (num>0){
                    Set keys= selector.selectedKeys();
                    Iteratoriterator=keys.iterator();
                    while(iterator.hasNext()){
                        SelectionKey key= iterator.next();
                        keys.remove(key);
                        if(key.isAcceptable()){
                            acceptHander(key);
                        }else if(key.isReadable()){
                            readHander(key);
                        }else {

                        }
                    }
                }
                if(!blockingQueue.isEmpty()){
                    Channel channel=  blockingQueue.take();
                    if(channel instanceof ServerSocketChannel){
                        ServerSocketChannel listen=(ServerSocketChannel)channel;
                        System.out.println(Thread.currentThread().getName()+"服务端注册listen");
                        listen.register(this.selector,SelectionKey.OP_ACCEPT);
                    }else if(channel instanceof SocketChannel){
                        SocketChannel socketChannel=(SocketChannel)channel;
                        ByteBuffer byteBuffer=ByteBuffer.allocateDirect(2048);
                        System.out.println(Thread.currentThread().getName()+"当前客户端注册在了该selector上");
                        socketChannel.register(this.selector,SelectionKey.OP_READ,byteBuffer);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void readHander(SelectionKey key) {
       System.out.println(Thread.currentThread().getName()+"开始read数据。。。。。。。");
       try {
            SocketChannel client=(SocketChannel) key.channel();
            client.configureBlocking(false);
            ByteBuffer  byteBuffer=(ByteBuffer)key.attachment();
            byteBuffer.clear();
            while (true){
                 int num= client.read(byteBuffer);
                 if(num>0){
                     byteBuffer.flip();
                     client.write(byteBuffer);
                 }else if(num==0){
                     break;
                 }else if(num<0){
                     System.out.println("client:"+client.getRemoteAddress().toString()+" 已经断开。。。。。");
                     key.cancel();/**当客户端断开之后,状态变化一直存在,除非用cancle剔除*/
                     break;
                 }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void acceptHander(SelectionKey key) {
       System.out.println(Thread.currentThread().getName()+"进入accept方法");
       try {
            ServerSocketChannel server=(ServerSocketChannel)key.channel();
            SocketChannel client= server.accept();
            client.configureBlocking(false);
            group.nextSelector(client);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 执行结果:

Thread-0服务端注册listen
Thread-1服务端注册listen
Thread-2服务端注册listen
Thread-0服务端注册listen
Thread-0进入accept方法
Thread-4当前客户端注册在了该selector上
Thread-4开始read数据。。。。。。。
Thread-4开始read数据。。。。。。。
Thread-4开始read数据。。。。。。。
Thread-0进入accept方法
Thread-5当前客户端注册在了该selector上
Thread-5开始read数据。。。。。。。
Thread-5开始read数据。。。。。。。
Thread-1进入accept方法
Thread-3当前客户端注册在了该selector上
Thread-3开始read数据。。。。。。。
Thread-3开始read数据。。。。。。。
 
IO