网络io
1.连接建立流程
利用socketServer创建服务端连接,使用tcp协议
public class SocketIoProperties { /** * server socket listen properties */ //server socket listen property: private static final int RECEIVE_BUFFER = 10; private static final int SO_TIMEOUT = 0; private static final boolean REUSE_ADDR = false; private static final int BACK_LOG = 2; //client socket listen property on server endpoint: private static final boolean CLI_KEEPALIVE = false; private static final boolean CLI_OOB = false; private static final int CLI_REC_BUF = 20; private static final boolean CLI_REUSE_ADDR = false; private static final int CLI_SEND_BUF = 20; private static final boolean CLI_LINGER = true; private static final int CLI_LINGER_N = 0; private static final int CLI_TIMEOUT = 0; private static final boolean CLI_NO_DELAY = false; public static void main(String[] args) { ServerSocket socket = null; try { socket = new ServerSocket(); socket.bind(new InetSocketAddress(9090), BACK_LOG); socket.setReceiveBufferSize(RECEIVE_BUFFER); socket.setReuseAddress(REUSE_ADDR); socket.setSoTimeout(SO_TIMEOUT); } catch (IOException e) { e.printStackTrace(); } System.out.println("server up use 9090"); try { while (true) { System.in.read(); Socket client = socket.accept(); System.out.println("client port: " + client.getPort()); client.setKeepAlive(CLI_KEEPALIVE); client.setOOBInline(CLI_OOB); client.setReceiveBufferSize(CLI_REC_BUF); client.setReuseAddress(CLI_REUSE_ADDR); client.setSendBufferSize(CLI_SEND_BUF); client.setSoLinger(CLI_LINGER, CLI_LINGER_N); client.setSoTimeout(CLI_TIMEOUT); client.setTcpNoDelay(CLI_NO_DELAY); new Thread( () -> { try { InputStream in = client.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); char[] data = new char[1024]; while (true) { int num = reader.read(data); if (num > 0) { System.out.println("client read some data is :" + num + " val :" + new String(data, 0, num)); } else if (num == 0) { System.out.println("client readed nothing!"); continue; } else { System.out.println("client readed -1..."); System.in.read(); client.close(); break; } } } catch (IOException e) { e.printStackTrace(); } } ).start(); } } catch (IOException e) { e.printStackTrace(); } } }
使用socket创建客户端连接:
public class SocketClient { public static void main(String[] args) { try { Socket socket = new Socket("", 9090); OutputStream outputStream = socket.getOutputStream(); socket.setSendBufferSize(20); socket.setOOBInline(false); socket.setTcpNoDelay(true); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(System.in)); while(true){ String s = bufferedReader.readLine(); if(!Optional.ofNullable(s).isPresent()){ byte[] by=s.getBytes(); for(byte bb:by){ outputStream.write(bb); } } } } catch (IOException e) { e.printStackTrace(); } } }
1.使用 netstat -natp 监控tcp端口占用:
Active Internet connections (servers and established) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 1175/sshd tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 1720/master tcp 0 0 192.168.72.181:22 192.168.72.1:56913 ESTABLISHED 2618/sshd: root@pts tcp 0 0 192.168.72.181:22 192.168.72.1:56423 ESTABLISHED 2406/sshd: root@pts tcp 0 0 192.168.72.181:22 192.168.72.1:56455 ESTABLISHED 2508/sshd: root@pts tcp6 0 0 192.168.72.181:3888 :::* LISTEN 1288/java tcp6 0 0 :::22 :::* LISTEN 1175/sshd tcp6 0 0 ::1:25 :::* LISTEN 1720/master tcp6 0 0 :::2181 :::* LISTEN 1288/java tcp6 0 0 :::38055 :::* LISTEN 1288/java tcp6 0 0 192.168.72.181:3888 192.168.72.182:35717 ESTABLISHED 1288/java tcp6 0 0 192.168.72.181:60022 192.168.72.183:2888 ESTABLISHED 1288/java tcp6 0 0 192.168.72.181:3888 192.168.72.183:37578 ESTABLISHED 1288/java
2.使用tcpdump 监听9090端口并抓取包,其中eno16777736为当前主机网卡名称
[root@hadoop01 ~]# tcpdump -nn -i eno16777736 port 9090 tcpdump: verbose output suppressed, use -v or -vv for full protocol decode listening on eno16777736, link-type EN10MB (Ethernet), capture size 262144 bytes
3.将两个文件上传linux,然后启动服务端:
javac ./SocketIoProperties.java && java SocketIoProperties
当程序正常启动之后:输出如下,程序正常,因为运行到System.in.read了,程序暂时处于阻塞状态。
[root@hadoop01 socket]# java SocketIoProperties server up use 9090
首先tcp端口监听增加了一个监听进程,端口号为9090,pid为2644
此时tcpdump抓包并没有抓到任何东西,因为还没有任何客户端和其交互
使用lsof命令来查看文件描述符,linux提供了两种帮助命令 --help 和man ,其中--help表示查看命令有哪些参数可以使用,man 则是直接查看该命令时做什么的,比如
man lsof :表示list 出打开的文件
lsof -p 2641 查看该进程的文件描述符,可看出最后一行中有个5u的文件描述符指向了监听接口
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2641 root cwd DIR 253,0 4096 37750610 /root/io/socket java 2641 root rtd DIR 253,0 4096 128 / java 2641 root txt REG 253,0 8712 103858189 /opt/software/jdk1.8.0_281/bin/java java 2641 root mem REG 253,0 106065056 67165411 /usr/lib/locale/locale-archive java 2641 root mem REG 253,0 113008 33705275 /opt/software/jdk1.8.0_281/jre/lib/amd64/libnet.so java 2641 root mem REG 253,0 67063338 868659 /opt/software/jdk1.8.0_281/jre/lib/rt.jar java 2641 root mem REG 253,0 127016 33705273 /opt/software/jdk1.8.0_281/jre/lib/amd64/libzip.so java 2641 root mem REG 253,0 57824 67165382 /usr/lib64/libnss_files-2.17.so java 2641 root mem REG 253,0 231960 33705272 /opt/software/jdk1.8.0_281/jre/lib/amd64/libjava.so java 2641 root mem REG 253,0 66112 33705123 /opt/software/jdk1.8.0_281/jre/lib/amd64/libverify.so java 2641 root mem REG 253,0 44088 67165394 /usr/lib64/librt-2.17.so java 2641 root mem REG 253,0 1141552 67165372 /usr/lib64/libm-2.17.so java 2641 root mem REG 253,0 17105112 71390141 /opt/software/jdk1.8.0_281/jre/lib/amd64/server/libjvm.so java 2641 root mem REG 253,0 2107816 67165364 /usr/lib64/libc-2.17.so java 2641 root mem REG 253,0 19512 67165370 /usr/lib64/libdl-2.17.so java 2641 root mem REG 253,0 109416 100838985 /opt/software/jdk1.8.0_281/lib/amd64/jli/libjli.so java 2641 root mem REG 253,0 142296 67165390 /usr/lib64/libpthread-2.17.so java 2641 root mem REG 253,0 164432 67165357 /usr/lib64/ld-2.17.so java 2641 root mem REG 253,0 32768 35410124 /tmp/hsperfdata_root/2641 java 2641 root 0u CHR 136,0 0t0 3 /dev/pts/0 java 2641 root 1u CHR 136,0 0t0 3 /dev/pts/0 java 2641 root 2u CHR 136,0 0t0 3 /dev/pts/0 java 2641 root 3r REG 253,0 67063338 868659 /opt/software/jdk1.8.0_281/jre/lib/rt.jar java 2641 root 4u unix 0xffff8804231df800 0t0 20177 socket java 2641 root 5u IPv6 20179 0t0 TCP *:websm (LISTEN)
此时,在别的节点【192.168.72.183】启动一个客户端:java SocketClient
查看tcpdump抓包命令,发现三次握手,首先客户端ip向服务端发送消息,服务端向客户端发送消息ack,客户端收到之后向服务端继续发送ack消息
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode listening on eno16777736, link-type EN10MB (Ethernet), capture size 262144 bytes 20:21:34.530881 IP 192.168.72.183.42601 > 192.168.72.181.9090: Flags [S], seq 3452114734, win 29200, options [mss 1460,sackOK,TS val 3805578 ecr 0,nop,wscale 7], length 0 20:21:34.530918 IP 192.168.72.181.9090 > 192.168.72.183.42601: Flags [S.], seq 3488810473, ack 3452114735, win 1152, options [mss 1460,sackOK,TS val 3832062 ecr 3805578,nop,wscale 0], length 0 20:21:34.531147 IP 192.168.72.183.42601 > 192.168.72.181.9090: Flags [.], ack 1, win 229, options [nop,nop,TS val 3805579 ecr 3832062], length 0
再看之前netstat监听的9090端口,发现内核中增加了一条建立连接的信息,但是并没有本机上那个进程去对接它
截止当前,双方已经建立了连接,但是服务端并没有进程去对接客户端过来的消息,当服务端有对接之后,就会多一个文件描述符
客户端发送消息,发现抓包中有信息出现,三次握手,然后netstat -natp查看端口发现并没有什么变化,但是,recv-Q却有数字变化,比如客户端发送了10个字节,数字将变成10
由此可见:客户端服务端建立连接对于服务端来讲,其实就是在内核中开辟一段资源,当连接建立之后,发送过来的数据就放在内核的该资源当中。
回车服务端,服务端accept方法执行
这时候,查看监听端口,发现如下:表示已经有一个java进程接受监听的端口所发送过来的信息
tcp6 0 0 192.168.72.181:9090 192.168.72.183:42601 ESTABLISHED 2641/java
再查看文件描述符:
[root@hadoop01 socket]# lsof -p 2641
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 2641 root cwd DIR 253,0 4096 37750610 /root/io/socket
java 2641 root rtd DIR 253,0 4096 128 /
java 2641 root txt REG 253,0 8712 103858189 /opt/software/jdk1.8.0_281/bin/java
java 2641 root mem REG 253,0 106065056 67165411 /usr/lib/locale/locale-archive
java 2641 root mem REG 253,0 113008 33705275 /opt/software/jdk1.8.0_281/jre/lib/amd64/libnet.so
java 2641 root mem REG 253,0 67063338 868659 /opt/software/jdk1.8.0_281/jre/lib/rt.jar
java 2641 root mem REG 253,0 127016 33705273 /opt/software/jdk1.8.0_281/jre/lib/amd64/libzip.so
java 2641 root mem REG 253,0 57824 67165382 /usr/lib64/libnss_files-2.17.so
java 2641 root mem REG 253,0 231960 33705272 /opt/software/jdk1.8.0_281/jre/lib/amd64/libjava.so
java 2641 root mem REG 253,0 66112 33705123 /opt/software/jdk1.8.0_281/jre/lib/amd64/libverify.so
java 2641 root mem REG 253,0 44088 67165394 /usr/lib64/librt-2.17.so
java 2641 root mem REG 253,0 1141552 67165372 /usr/lib64/libm-2.17.so
java 2641 root mem REG 253,0 17105112 71390141 /opt/software/jdk1.8.0_281/jre/lib/amd64/server/libjvm.so
java 2641 root mem REG 253,0 2107816 67165364 /usr/lib64/libc-2.17.so
java 2641 root mem REG 253,0 19512 67165370 /usr/lib64/libdl-2.17.so
java 2641 root mem REG 253,0 109416 100838985 /opt/software/jdk1.8.0_281/lib/amd64/jli/libjli.so
java 2641 root mem REG 253,0 142296 67165390 /usr/lib64/libpthread-2.17.so
java 2641 root mem REG 253,0 164432 67165357 /usr/lib64/ld-2.17.so
java 2641 root mem REG 253,0 32768 35410124 /tmp/hsperfdata_root/2641
java 2641 root 0u CHR 136,0 0t0 3 /dev/pts/0
java 2641 root 1u CHR 136,0 0t0 3 /dev/pts/0
java 2641 root 2u CHR 136,0 0t0 3 /dev/pts/0
java 2641 root 3r REG 253,0 67063338 868659 /opt/software/jdk1.8.0_281/jre/lib/rt.jar
java 2641 root 4u unix 0xffff8804231df800 0t0 20177 socket
java 2641 root 5u IPv6 20179 0t0 TCP *:websm (LISTEN)
java 2641 root 6u IPv6 24722 0t0 TCP hadoop01:websm->hadoop03:42601 (ESTABLISHED)
表示多了一个文件描述符,也就是说当服务端调用accept方法之后,服务端程序才接收客户端发送过来的消息,而接收消息是通过调用内核accept方法,内核打开了文件,返回一个文件描述符,该文件描述符表示服务端通信。
整个通信流程如下:
2.服务端参数
backlog:当backlog设置为2时,服务端最多可以建立三个连接,当有新的链接进来时,三次握手,第一次访问服务端,但是服务端不会搭理它或者发送返回失败,也就是三次握手不会进行完成,相当于放弃了该连接
netstat查看结果显示:listen:监听 established:建立连接 syn_recy:三次连接建立失败,放弃本次连接
timeout:时间超时,调用accept之后,如果时间超时,会发生异常,但并不影响整个程序运行
SendBufferSize:发送的缓存大小,开启优化之后其实包的大小时可以大于该值的
setOOBInline:首字母是否提前发送
setTcpNoDelay:是否优化,比如设置为false,表示优化,如果数据比较大,会一批批发送,如果不优化,会尽快发送数据
setKeepAlive:如果开启,定时会发送消息确认连接是否正常,如果不正常,就除掉
3.三次握手发送信息
包的大小:ifconfig查看mtu值,该值为1500,表示包的总大小,上图中option中的mss表示除去ip,port等之外的数据大小
窗口:窗口是客户端和服务端协商的结果,每次发包都会汇报窗口大小,比如当服务端的窗口比较大时,客户端可以发送多个包过去,当服务端的窗口满了,客户端阻塞,不再发送数据,当服务端处理一部分之后,客户端才继续发(UC控制)
第一次
第一次握手:建立连接时,客户端发送syn包(seq=j)到服务器,并进入SYN_SENT状态,等待服务器确认;SYN:同步序列编号(Synchronize Sequence Numbers)。第二次
第二次握手:服务器收到syn包,必须确认客户端的SYN(ack=j+1),同时自己也发送一个SYN包(seq=k),即SYN+ACK包,此时服务器进入SYN_RECV状态。第三次
第三次握手:客户端收到服务器的SYN+ACK包,向服务器发送确认包ACK(ack=k+1),此包发送完毕,客户端和服务器进入ESTABLISHED(TCP连接成功)状态,完成三次握手。4.程序建立监听步骤
java 2641 root 5u IPv6 20179 0t0 TCP *:websm (LISTEN)
在linux内核的具体实现:
1.创建文件描述符 socket=fd5
2.绑定bind(fd5,20179)
3.listen(fd5)
5.BIO和NIO的区别
上述代码就是普通的BIO,在while(true)循环中接收连接,然后每次接收到一个连接之后,创建一个多线程,在多线程中循环获取该连接的数据,这样做的目的是因为该代码中有两个地方阻塞,一个时accept()方法,一个时read()方法,假设只有一个,比如accpet(),只要放在一个while(true)循环中就行了,但是有了read(),当read()阻塞之后,accept()方法无法执行,也就无法创建文件标识符,代码无法向后续走,所以只能使用多线程处理
即便这样,还是有很大缺点:c10k问题,每有一个连接,就需要clone一个线程,需要一次系统调用,当10k个连接时,就需要创建1万个线程,消耗资源严重
由此可见 :BIO效率低下是因为服务端对于每个连接需要增加一个系统调用clone线程,而造成这种情况的根本原因是kernel的提供的api中,accept()方法和read()方法都是阻塞的。
一个普通的建立连接的clinet端代码:
public static void main(String[] args) { LinkedListclients = new LinkedList<>(); InetSocketAddress serverAddr = new InetSocketAddress("192.168.150.11", 9090); //端口号的问题:65535 // windows for (int i = 10000; i < 65000; i++) { try { SocketChannel client1 = SocketChannel.open(); SocketChannel client2 = SocketChannel.open(); /* linux中你看到的连接就是: client...port: 10508 client...port: 10508 */ client1.bind(new InetSocketAddress("192.168.150.1", i)); // 192.168.150.1:10000 192.168.150.11:9090 client1.connect(serverAddr); clients.add(client1); client2.bind(new InetSocketAddress("192.168.110.100", i)); // 192.168.110.100:10000 192.168.150.11:9090 client2.connect(serverAddr); clients.add(client2); } catch (IOException e) { e.printStackTrace(); } } System.out.println("clients "+ clients.size()); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } }
当使用这种方式连接服务端时,发现192.168.110.100并没有建立连接,原因是当第一次握手成功之后,第二次握手时,发现由于客户端和服务端不是同一网段,所以经过路由跳的时候起始点变成了linux服务器的网卡地址,这样客户端收到信息之后,没法处理(和自己发送的目标地址不一致)
处理方法:查看路由表,发现默认
添加网关:
使用NIO解决c10k问题:
linux提供了新的api,NIO中的n既有新io的意思,又有no block的意思,代码实现如下,在同一个线程中:
public static void main(String[] args) throws Exception { LinkedListclients = new LinkedList<>(); ServerSocketChannel ss = ServerSocketChannel.open(); //服务端开启监听:接受客户端 ss.bind(new InetSocketAddress(9090)); ss.configureBlocking(false); //重点 OS NONBLOCKING!!! //只让接受客户端 不阻塞 while (true) { //接受客户端的连接 Thread.sleep(1000); SocketChannel client = ss.accept(); //不会阻塞? -1 NULL //accept 调用内核了:1,没有客户端连接进来,返回值?在BIO 的时候一直卡着,但是在NIO ,不卡着,返回-1,NULL //如果来客户端的连接,accept 返回的是这个客户端的fd 5,client object //NONBLOCKING 就是代码能往下走了,只不过有不同的情况 if (client == null) { // System.out.println("null....."); } else { client.configureBlocking(false); //重点 socket(服务端的listen socket<连接请求三次握手后,往我这里扔,我去通过accept 得到 连接的socket>,连接socket<连接后的数据读写使用的> ) int port = client.socket().getPort(); System.out.println("client..port: " + port); clients.add(client); } ByteBuffer buffer = ByteBuffer.allocateDirect(4096); //可以在堆里 堆外 //遍历已经链接进来的客户端能不能读写数据 for (SocketChannel c : clients) { //串行化!!!! 多线程!! int num = c.read(buffer); // >0 -1 0 //不会阻塞 if (num > 0) { buffer.flip(); byte[] aaa = new byte[buffer.limit()]; buffer.get(aaa); String b = new String(aaa); System.out.println(c.socket().getPort() + " : " + b); buffer.clear(); } } } }
通过设置:configureBlocking(false),表示内核的api,accept()方法不再阻塞,当没有连接时,返回-1,通过这种方式,提高了网络连接效率,但是还是有缺点就是如果一次进来多个连接,每次循环还是只能一个一个便利拿取连接
另外:持续创建文件描述符有可能超出文件描述符的最大值,第一幅中为默认值,os文件描述符的最大值
文件描述符的大小和内存有关
NIO的优势和缺点:
6.多路复用器的引入
当我们使用NIO时,虽然它有个有个优势就是使用一个线程或者多个线程处理多个io连接,但是同样它也有问题,同样都是c10k问题,就是当连接达到10K时,每次accept一个线程,就需要便利所有连接,看看有哪些线程有哪些连接有数据返回,但事实上,这一万个连接中其实只有几个连接有数据返回,而且每次便利这一万个连接,都需要系统调用去处理,为了解决这样的问题,引出了多路复用器,多路复用器指的是内核提供了一套api,当一次系统调用将所有的文件描述符传入内核的select方法,该方法返回连接状态,这样程序只要访问那些有状态的连接获取数据就行了
具体流程如下:
setect多路复用器,其实就是内核提供的方法,查看该方法的内容可以使用命令:man 2 select 该命令中的2解释如下:
1、Standard commands (标准命令) 2、System calls (系统调用) 3、Library functions (库函数) 4、Special devices (设备说明) 5、File formats (文件格式) 6、Games and toys (游戏和娱乐) 7、Miscellaneous (杂项) 8、Administrative Commands (管理员命令) 9 其他(Linux特定的), 用来存放内核例行程序的文档。
man 2 select 可以查看到如下等信息:
SELECT(2) Linux Programmer's Manual SELECT(2) NAME select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing SYNOPSIS /* According to POSIX.1-2001 */ #includeselect.h> /* According to earlier standards */ #include #include #include int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set); #include select.h> int pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask); Feature Test Macro Requirements for glibc (see feature_test_macros(7)): pselect(): _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600 DESCRIPTION select() and pselect() allow a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input pos‐ sible). A file descriptor is considered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking. 。。。。。。。。。。。。。。。。。。
select方法的部分理解:
7.同步,异步,阻塞,非阻塞
阻塞和非阻塞是对于内核的方法比如accept和read来说的,同步和异步指的是内核有没有自己读取数据,并将数据放入内核buffer,程序只和buffer交互,不自己去调用内核的read()方法获取数据。
8.poll,select,epoll的区别
无论是NIO还是select还是poll,都需要便利每个IO,拿取状态,但是NIO的便利成本在系统调用,select 和poll是发生了一次系统调用之后,内核根据用户传过来的fd进行自己便利,返回状态
而这种select和poll 的方式有这样的弊端:1 便利所有传入的fd,2 每次都需要传入一次fd,为了解决这个问题,产生了epoll,epoll是将传入的fd进行保存在内核中的开辟空间中,当网卡中传入数据,cpu发生中断,网卡对应的传入数据的fd在内核中找到之前开辟的空间中的对应的fd,然后将该fd加入链表,这样每次客户端读取数据的时候直接在链表中拿到fd,对拿到的fd进行读取操作,不需要再进行循环遍历
如上图,左侧为epoll,右侧为select或者poll
多路复用器使用:
private ServerSocketChannel server = null; private Selector selector = null; //linux 多路复用器(select poll epoll kqueue) nginx event{} int port = 9090; public void initServer() { try { server = ServerSocketChannel.open(); server.configureBlocking(false); server.bind(new InetSocketAddress(port)); //如果在epoll模型下,open--》 epoll_create -> fd3 selector = Selector.open(); // select poll *epoll 优先选择:epoll 但是可以 -D修正 //server 约等于 listen状态的 fd4 /** register 如果:select,poll:jvm里开辟一个数组 fd4 放进去 epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN */ server.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } public void start() { initServer(); System.out.println("服务器启动了。。。。。"); try { while (true) { //死循环 Setkeys = selector.keys(); System.out.println(keys); System.out.println(keys.size()+" size"); //1,调用多路复用器(select,poll or epoll (epoll_wait)) /** select()是啥意思: 1,select,poll 其实 内核的select(fd4) poll(fd4) 2,epoll: 其实 内核的 epoll_wait() , 参数可以带时间:没有时间,0 : 阻塞,有时间设置一个超时 selector.wakeup() 结果返回0 懒加载: 其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用*/ while (selector.select() > 0) { Set selectionKeys = selector.selectedKeys(); //返回的有状态的fd集合 Iterator iter = selectionKeys.iterator(); /**管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!! NIO 自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了? //我前边可以强调过,socket: listen 通信 R/W*/ while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); //set 不移除会重复循环处理 if (key.isAcceptable()) { /**看代码的时候,这里是重点,如果要去接受一个新的连接 语义上,accept接受连接且返回新连接的FD对吧? //那新的FD怎么办? //select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起 //epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间*/ acceptHandler(key); } else if (key.isReadable()) { readHandler(key); //连read 还有 write都处理了 //在当前线程,这个方法可能会阻塞 ,如果阻塞了十年,其他的IO早就没电了。。。 //所以,为什么提出了 IO THREADS //redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的 //tomcat 8,9 异步的处理方式 IO 和 处理上 解耦 } } } } } catch (IOException e) { e.printStackTrace(); } } public void acceptHandler(SelectionKey key) { try { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端 fd7 client.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(8192); //前边讲过了 /*** 你看,调用了register select,poll:jvm里开辟一个数组 fd7 放进去 epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN */ client.register(selector, SelectionKey.OP_READ, buffer); System.out.println("-------------------------------------------"); System.out.println("新客户端:" + client.getRemoteAddress()); System.out.println("-------------------------------------------"); } catch (IOException e) { e.printStackTrace(); } } public void readHandler(SelectionKey key) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); int read = 0; try { while (true) { read = client.read(buffer); if (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { client.write(buffer); //读入的信息从新写回去 } buffer.clear(); } else if (read == 0) { break; } else { client.close(); break; } } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1(); service.start(); }
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider
同一个代码,使用不同的方式启动,就会使用不同的多路复用器处理
代码分析:启动程序,指定使用poll多路复用器运行程序,strace命令是跟踪程序,跟踪程序以及程序运行启动的子程序发生的系统调用
javac SocketMultiplexingSingleThreadv1.java && strace -ff -o p java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider SocketMultiplexingSingleThreadv1
命令执行之后:
服务端启动正常,且查strace命令的跟踪,发现生成 了许多文件,且都以pid结尾
netstat -natp 查看pid状况,发现9090端口被4144程序进行监听
查看文件描述符
首先用jps 查看当前java的pid,为4144
losf -p 4144
以上初始状态完成之后,使用nc localhost 9090 连接服务器
一旦执行,服务端就会有程序连接显示,因为在服务端打印了连接信息
从新查看tcp端口信息:
文件描述符
查看系统调用信息:
vim p.4108 查看4108文件中系统调用(一般选择文件比较大的,一般我们程序发生系统调用会在文件大的里面)
:set nu 显示文件行数在行的开头
/9090 查找9090
可以查看到具体的系统调用过程,先建立socket连接,创建监听文件描述符,监听文件描述符,并将监听文件
当程序发生调用时,如上,传入多个文件描述符,返回的是发生变化的文件描述符7,而7在前面发生了写入操作
pool和epoll系统调用对比如下:
9.网络连接四次分手
关闭客户端nc连接:
发现客户端和服务端的连接已经结束,但是客户端还未结束,并且状态为TIME_WAIT状态
原因是客户端和服务端结束连接需要经过四次分手环节:简单来讲,客户端发送消息说自己需要关闭,服务端ack响应,然后服务端发送消息说自己也要和客户端断开连接,然后客户端发送ack响应,所以正常流程是谁先发送关闭消息,谁最后处于的状态就是TIME_WAIT状态,但是这个状态有时间限制,当过了这个时间,整个连接就会结束(如下图下方)。如果服务端代码中没有写clent.close()方法,那么服务端和客户端会同时出现TIME_WAIT状态,如下图上方,客户端发送关闭请求,服务端响应,然后没有然后了,这种情况下会浪费资源,浪费的资源来自于四元组,客户端不挂掉,客户端就永远不能在同一机子上启动同一端口的连接
10.多线程处理select
public class SelectorThreadHandler { Selector selector; ServerSocketChannel serverSocketChannel; @Before public void init() throws IOException { selector=Selector.open(); serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(9999)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } @Test public void executorTest() throws IOException { Setkeys= selector.keys(); System.out.println(String.format("当前有的SelectionKey的个数:%s,他们分别是:%s",keys.size(),keys)); while (selector.select()>0){ Set keySet=selector.selectedKeys(); Iterator iterator= keySet.iterator(); SelectionKey select= iterator.next(); iterator.remove(); if(select.isAcceptable()){ /**个人理解:服务这边可以监听多路复用器过来的连接和数据*/ ServerSocketChannel serverSocketChannel=(ServerSocketChannel)select.channel(); serverSocketChannel.configureBlocking(false); SocketChannel clinetConnect= serverSocketChannel.accept(); clinetConnect.configureBlocking(false); clinetConnect.register(selector,SelectionKey.OP_READ,ByteBuffer.allocateDirect(10)); }else if(select.isReadable()){ System.out.println("reader"); new Thread(()->{ try { SocketChannel channel= (SocketChannel)select.channel(); ByteBuffer byteBuffer=(ByteBuffer)select.attachment(); byteBuffer.clear(); while(channel.read(byteBuffer)>0){ byteBuffer.flip(); channel.write(byteBuffer); } channel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } }).start(); }else { System.out.println("其他"); } } } }
发现一个问题:
客户端正常输入和输出,但是每次当Thread线程中还在处理的时候reader已经输出了好几遍,网上有使用cancle()方法取消key的,public abstract void cancel()方法的作用是请求取消此键的通道到其选择器的注册。一旦返回,该键就是无效的,并且将被添加到其选择器的已取消键集中。在进行下一次选择操作时,将从所有选择器的键集中移除该键。如果已取消了此键,则调用此方法无效。一旦取消某个键,SelectionKey.isValid) 方法返回false。可在任意时间调用cancel()方法。此方法与选择器的已取消键集保持同步,因此,如果通过涉及同一选择器的取消或选择操作并发调用它,则它可能会暂时受阻塞。当使用cancel时,客户端再次输入信息,服务端将不再有收获,所以这种方法并没有什么卵用,还是一个selector一个线程,将读来的数据写回去,正常读写!
public class MultiThread { public static void main(String[] args) { /**主方法,提供一个selector分组,在初始化该分组时创建n个selector,每个selector绑定一个线程*/ MyThreadGroup threadGroup=new MyThreadGroup(3); /**提供绑定服务的方法,调用该方法时,将该端口对象的服务注册到selector上*/ threadGroup.bind(9999); } }
public class MyThreadGroup { ThreadSelector[] selector; AtomicInteger integer=new AtomicInteger(0); public MyThreadGroup(int num) { selector=new ThreadSelector[num]; for(int j=0;j){ selector[j]= new ThreadSelector(this); new Thread(selector[j]).start(); } } public void bind(int i) { try { ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(i)); nextSelector(serverSocketChannel); } catch (IOException e) { e.printStackTrace(); } } public void nextSelector(Channel channel) { if(channel instanceof ServerSocketChannel){ /**当是服务时,使用第一个selector*/ selector[0].blockingQueue.add(channel); selector[0].selector.wakeup(); }else { /**其他的channel放置在除第一个之外的selector上*/ ThreadSelector threadSelector=next(); /**将服务注册添加到当前选择之后的Selector中的,然后在该selector中自己进行注册*/ threadSelector.blockingQueue.add(channel); /**如下方式,我们无法保证服务threadSelector中的select方法的阻塞在唤醒之后与我们主线程中的服务的注册的前后*/ threadSelector.selector.wakeup(); /*ServerSocketChannel serverSocketChannel=(ServerSocketChannel)channel; try { System.out.println("如果当前线程阻塞,则唤醒"); threadSelector.selector.wakeup();*//**如果当前selector,则会唤醒阻塞状态*//* System.out.println("开始注册server,当前线程:"+Thread.currentThread().getName()); serverSocketChannel.register(threadSelector.selector, SelectionKey.OP_ACCEPT); } catch (Exception e) { e.printStackTrace(); }*/ } } /**轮训获取一个selector所在的对象*/ private ThreadSelector next() { /***注:首次使用integer.getAndIncrement()获取的最终值就是定义时的默认值为0,后期逐渐加一*/ int index=integer.getAndIncrement()%(selector.length-1); return selector[index+1]; } }
public class ThreadSelector implements Runnable{ Selector selector=null; MyThreadGroup group=null; LinkedBlockingQueueblockingQueue=new LinkedBlockingQueue<>(); public ThreadSelector(MyThreadGroup group){ try { this.group=group; this.selector=Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public void run() { while (true){ try { // System.out.println(Thread.currentThread().getName()+"运行一个selector,检测状态改变之前,当前keys为:"+selector.keys().size()); int num=selector.select(); // System.out.println(Thread.currentThread().getName()+"检测状态改变之后:"+selector.keys().size()); if (num>0){ Set keys= selector.selectedKeys(); Iterator iterator=keys.iterator(); while(iterator.hasNext()){ SelectionKey key= iterator.next(); keys.remove(key); if(key.isAcceptable()){ acceptHander(key); }else if(key.isReadable()){ readHander(key); }else { } } } if(!blockingQueue.isEmpty()){ Channel channel= blockingQueue.take(); if(channel instanceof ServerSocketChannel){ ServerSocketChannel listen=(ServerSocketChannel)channel; System.out.println(Thread.currentThread().getName()+"服务端注册listen"); listen.register(this.selector,SelectionKey.OP_ACCEPT); }else if(channel instanceof SocketChannel){ SocketChannel socketChannel=(SocketChannel)channel; ByteBuffer byteBuffer=ByteBuffer.allocateDirect(2048); System.out.println(Thread.currentThread().getName()+"当前客户端注册在了该selector上"); socketChannel.register(this.selector,SelectionKey.OP_READ,byteBuffer); } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void readHander(SelectionKey key) { System.out.println(Thread.currentThread().getName()+"开始read数据。。。。。。。"); try { SocketChannel client=(SocketChannel) key.channel(); client.configureBlocking(false); ByteBuffer byteBuffer=(ByteBuffer)key.attachment(); byteBuffer.clear(); while (true){ int num= client.read(byteBuffer); if(num>0){ byteBuffer.flip(); client.write(byteBuffer); }else if(num==0){ break; }else if(num<0){ System.out.println("client:"+client.getRemoteAddress().toString()+" 已经断开。。。。。"); key.cancel();/**当客户端断开之后,状态变化一直存在,除非用cancle剔除*/ break; } } } catch (IOException e) { e.printStackTrace(); } } private void acceptHander(SelectionKey key) { try { ServerSocketChannel server=(ServerSocketChannel)key.channel(); SocketChannel client= server.accept(); client.configureBlocking(false); group.nextSelector(client); } catch (IOException e) { e.printStackTrace(); } } }
代码微调:
public class MultiThread { public static void main(String[] args) { /**主方法,提供一个selector分组,在初始化该分组时创建n个selector,每个selector绑定一个线程*/ MyThreadGroup listen=new MyThreadGroup(3); MyThreadGroup worker=new MyThreadGroup(3); listen.setWorker(worker); /**提供绑定服务的方法,调用该方法时,将该端口对象的服务注册到selector上*/ listen.bind(9999); listen.bind(8888); listen.bind(7777); listen.bind(6666); } }
public class MyThreadGroup { ThreadSelector[] selector; MyThreadGroup workerGroup=this; AtomicInteger integer=new AtomicInteger(0); public MyThreadGroup(int num) { selector=new ThreadSelector[num]; for(int j=0;j){ selector[j]= new ThreadSelector(this); new Thread(selector[j]).start(); } } public void bind(int i) { try { ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(i)); nextSelector(serverSocketChannel); } catch (IOException e) { e.printStackTrace(); } } public void nextSelector(Channel channel) { if(channel instanceof ServerSocketChannel){ ThreadSelector listenSelector= commonNext(); listenSelector.blockingQueue.add(channel); listenSelector.selector.wakeup(); }else { ThreadSelector workSelector= workerNext(); workSelector.blockingQueue.add(channel); workSelector.selector.wakeup(); } /*if(channel instanceof ServerSocketChannel){ *//**当是服务时,使用第一个selector*//* selector[0].blockingQueue.add(channel); selector[0].selector.wakeup(); }else { *//**其他的channel放置在除第一个之外的selector上*//* ThreadSelector threadSelector=next(); *//**将服务注册添加到当前选择之后的Selector中的,然后在该selector中自己进行注册*//* threadSelector.blockingQueue.add(channel); *//**如下方式,我们无法保证服务threadSelector中的select方法的阻塞在唤醒之后与我们主线程中的服务的注册的前后*//* threadSelector.selector.wakeup();*/ /*ServerSocketChannel serverSocketChannel=(ServerSocketChannel)channel; try { System.out.println("如果当前线程阻塞,则唤醒"); threadSelector.selector.wakeup();*//**如果当前selector,则会唤醒阻塞状态*//* System.out.println("开始注册server,当前线程:"+Thread.currentThread().getName()); serverSocketChannel.register(threadSelector.selector, SelectionKey.OP_ACCEPT); } catch (Exception e) { e.printStackTrace(); } }*/ } /**轮训获取一个selector所在的对象*/ private ThreadSelector next() { /***注:首次使用integer.getAndIncrement()获取的最终值就是定义时的默认值为0,后期逐渐加一*/ int index=integer.getAndIncrement()%(selector.length-1); return selector[index+1]; } private ThreadSelector commonNext() { /***注:首次使用integer.getAndIncrement()获取的最终值就是定义时的默认值为0,后期逐渐加一*/ int index=integer.getAndIncrement()%selector.length; return selector[index]; } private ThreadSelector workerNext() { /***注:首次使用integer.getAndIncrement()获取的最终值就是定义时的默认值为0,后期逐渐加一*/ int index=integer.getAndIncrement()%this.workerGroup.selector.length; return this.workerGroup.selector[index]; } public void setWorker(MyThreadGroup worker) { this.workerGroup=worker; } }
public class ThreadSelector extends ThreadLocal> implements Runnable { Selector selector=null; MyThreadGroup group=null; @Override protected LinkedBlockingQueue initialValue() { return new LinkedBlockingQueue<>(); } LinkedBlockingQueue blockingQueue=get(); public ThreadSelector(MyThreadGroup group){ try { this.group=group; this.selector=Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public void run() { while (true){ try { // System.out.println(Thread.currentThread().getName()+"运行一个selector,检测状态改变之前,当前keys为:"+selector.keys().size()); int num=selector.select(); // System.out.println(Thread.currentThread().getName()+"检测状态改变之后:"+selector.keys().size()); if (num>0){ Set keys= selector.selectedKeys(); Iterator iterator=keys.iterator(); while(iterator.hasNext()){ SelectionKey key= iterator.next(); keys.remove(key); if(key.isAcceptable()){ acceptHander(key); }else if(key.isReadable()){ readHander(key); }else { } } } if(!blockingQueue.isEmpty()){ Channel channel= blockingQueue.take(); if(channel instanceof ServerSocketChannel){ ServerSocketChannel listen=(ServerSocketChannel)channel; System.out.println(Thread.currentThread().getName()+"服务端注册listen"); listen.register(this.selector,SelectionKey.OP_ACCEPT); }else if(channel instanceof SocketChannel){ SocketChannel socketChannel=(SocketChannel)channel; ByteBuffer byteBuffer=ByteBuffer.allocateDirect(2048); System.out.println(Thread.currentThread().getName()+"当前客户端注册在了该selector上"); socketChannel.register(this.selector,SelectionKey.OP_READ,byteBuffer); } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void readHander(SelectionKey key) { System.out.println(Thread.currentThread().getName()+"开始read数据。。。。。。。"); try { SocketChannel client=(SocketChannel) key.channel(); client.configureBlocking(false); ByteBuffer byteBuffer=(ByteBuffer)key.attachment(); byteBuffer.clear(); while (true){ int num= client.read(byteBuffer); if(num>0){ byteBuffer.flip(); client.write(byteBuffer); }else if(num==0){ break; }else if(num<0){ System.out.println("client:"+client.getRemoteAddress().toString()+" 已经断开。。。。。"); key.cancel();/**当客户端断开之后,状态变化一直存在,除非用cancle剔除*/ break; } } } catch (IOException e) { e.printStackTrace(); } } private void acceptHander(SelectionKey key) { System.out.println(Thread.currentThread().getName()+"进入accept方法"); try { ServerSocketChannel server=(ServerSocketChannel)key.channel(); SocketChannel client= server.accept(); client.configureBlocking(false); group.nextSelector(client); } catch (IOException e) { e.printStackTrace(); } } }
执行结果:
Thread-0服务端注册listen Thread-1服务端注册listen Thread-2服务端注册listen Thread-0服务端注册listen Thread-0进入accept方法 Thread-4当前客户端注册在了该selector上 Thread-4开始read数据。。。。。。。 Thread-4开始read数据。。。。。。。 Thread-4开始read数据。。。。。。。 Thread-0进入accept方法 Thread-5当前客户端注册在了该selector上 Thread-5开始read数据。。。。。。。 Thread-5开始read数据。。。。。。。 Thread-1进入accept方法 Thread-3当前客户端注册在了该selector上 Thread-3开始read数据。。。。。。。 Thread-3开始read数据。。。。。。。