05-netty读书笔记
本文是《Netty、Redis、Zookeeper高并发实战》的读书笔记。
包含Netty部分的内容。未包含Redis和zookeeper的内容。
IO
Socket,文件的IO都是进程缓存区和Kernel缓存区的交换,当内核缓存需要进行真实的物理IO的时候,会发生系统中断(读中断,写中断),此时会发生进程的信息和状态保护。完成物理IO后,还需要回复进程的数据和状态信息。
4中主要的IO模型
1、同步阻塞IO(Blocking IO)
阻塞与非阻塞:kernel IO完成后才能返回用户控件执行用户的操作,否则,用户控件处于“卡死”状态(其实是在等待IO完成。阻塞指的是用户空间的状态。
同步与异步:是一种用户空间与内核空间的IO发起方式。
同步IO是指用户空间的线程是主动发起IO请求的一方,内核空间是被动接受方。
异步IO则反过来,是指系统内核是主动发起IO请求的一方,用户空间的线程是被动接受方。
传统的IO模型都是同步阻塞IO。在Java中,默认创建的socket都是阻塞的。
详细解释
直来直去的。属于过程编程。
优点:开发简单;阻塞线程会被挂起,基本不会占用CPU的资源。
2、同步非阻塞IO(Non-blocking IO)
非阻塞IO:指的是用户空间的线程不需要等待内核IO操作彻底完成,可以立即返回用户空间执行接下来的操作,即处于非阻塞状态。与此同时,内核会立即返回给用户一个状态值。
非阻塞IO要求socket被设置为NONBLOCKING。
此处的NIO(同步非阻塞IO)模型不是Java中的NIO(New IO)库。
详细解释
NIO中,一旦开始IO系统调用,会出现两种情况:
- 内核缓冲区中无数据,系统调用会立即返回,返回一个失败的信息
- 内核缓冲区中有数据,是阻塞的,直到数据从内核缓冲区复制到用户进程缓冲区。完成后,系统调用返回成功,用户进程开始处理用户空间的缓存数据。
特点
- 用户程序的线程需要不断的进行IO系统调用,轮询数据是否已经准备好,如果没有准备好,就继续轮询,直到按成IO系统调用为止。
- 每次发起IO系统调用,在内核等待数据过程中可以立即返回,用户线程不会阻塞,实时性较好
- 不断的轮询,占用大量的CPU时间,效率低下。
- 高并发场景下,同步非阻塞IO不可以用
3、IO多路复用(IO Multiplexing)
即经典的Reactor反应器设计模式,有时也称为异步阻塞IO,Java中的Selector选择器和Linux的epoll都是这种模型。
详细解释
IO多路复用,可以解决同步非阻塞IO中的轮询等待问题。
引入一种新的系统调用:查询IO的就绪状态。Linux中是select/epoll系统调用。通过该系统调用,一个线程可以见识多个文件描述符,一旦某个描述符就绪(一般是内核缓冲区可读/可写),内容能够将就绪的状态返回给应用程序。随后应用程序根据就绪状态进行相应的IO系统调用。
在IO多路复用模型中通过select/epoll系统调用,单个应用程序的线程可以不断地轮询成百上千的socket连接,当某个连接有IO就绪的状态,就返回对应的可以执行的读写操作。
过程
- 1,选择器注册。讲需要操作的目标socket连接提前注册到select/epoll的选择器中。
- 2.就绪状态的轮询。通过选择器的查询方法,查询注册过的所有socket连接的就绪状态。通过查询的系统调用,内核会返回一个就绪的socket列表。放任何一个注册过的socket中的数据准备好了,内核缓冲区有数据了,内核会降该socket加入到就绪列表中。
- 当用户线程调用了select查询方法,那么这个线程就会被阻塞掉
- 3,用户线程获得了就绪状态的列表后,根据其中的socket连接,发起read系统调用,用户线程阻塞。内核开始复制数据到用户空间缓冲区。
- 4、复制完成后,内核返回结果,用户线程解除阻塞。用户线程读取到了数据,就继续执行。
特点
-
IO多路复用模型涉及到两种系统调用:
- select/epoll的就绪查询
- IO操作(read.write)
-
与NIO相似,IO多路复用模型也需要轮询。线程需要不断的进行select/epoll轮询,查找出达到IO操作就绪的socket连接。
-
IO多路复用与NIO有密切关系,对于注册在选择器上的每一个可以查询的socket连接,一般都设置为同步非阻塞模型。
-
优点:
- 与一个线程维护一个连接的阻塞IO相比,使用select/epoll的最大优势在于,一个选择器查询线程可以同时处理成千上万的连接。大大减小了系统开销。
-
缺点:
- 本质上,select/epoll系统调用是阻塞的,属于同步IO。都需要在读写事件就绪后,由系统调用本身负责读写,也就是说这个读写过程是阻塞的。
4、异步IO(Asynchronous IO,AIO)
异步IO,指的是用户空间与内核空间的调用方式反过来。用户空间的线程变成被动接受者,而内核空间成了主动调用者。类似于Java中的回调模式,用户空间的线程向内核空间注册了各种IO事件的回调函数,由内核去主动调用。
详细解释
AIO基本流程:用户线程通过系统调用,向内核注册某个IO操作。内核在整个IO操作完成后(数据从网卡复制到用户内存缓冲区),通知用户线程继续执行后续的业务操作。
举个例子
- 1、用户线程发起一个read系统调用,然后就去做用户线程接下来的其他事情,用户线程不阻塞
- 2、内核开始IO操作,完成数据准备,复制到用户内存空间
- 3、内核给用户线程发生一个信号或者回调用户线程注册的回调接口,告诉用户线程read操作完成了
- 用户线程读取用户缓冲区的数据,完成后续的业务操作。
特点
- 用户线程非阻塞
- 用户线程需要接收内核IO完成的事件,或者用户线程需要注册一个IO操作完成的回调函数
- AIO也被称为信号驱动IO
- 缺点:
- 需要底层内核支持
- 用户程序仅需要进行事件的注册与接收,其他事情交给了操作系统
Netty使用的是IO多路复用模型。Linux底层仍使用的事epoll。
Java NIO基础详解
Java NIO
核心组件
- Channel(通道)
- Buffer(缓冲区)
- Selector(选择器)
属于IO多路复用模型。
Old IO(OIO)是面向流的,NIO是面向缓冲区的。
- OIO是面向字节流或者字符流的,一个个读取,NIO是面向缓冲区的,通过Channel和Buffer进行读取数据,可以随意读取Buffer中任意位置的数据
- OIO是阻塞的,NIO是非阻塞的
Channel
OIO中会有输入流和输出流,而NIO中的Channel可以进行读取与写入。
Selector
IO多路复用模型,指的是一个线程或者进程可以同时监视多个文件描述符,一旦其中一个或者多个文件描述符可读或者可写,系统内核就通知该线程。Selector选择器是一个IO事件的查询器。通过选择器,一个线程可以查询多个通道的IO事件的就绪状态。
首先把通道注册到Selector中,然后通过Selector内部的机制,可以查询注册的通道是否有就绪的IO事件(读/写/连接完成)。
一个选择器只需要一个线程进行监控。即,可以通过一个线程去管理多个通道。(背后是IO多路复用的支持)
Buffer
应用程序与Channel的主要交互操作就是进行数据的读写。通道的读取,就是把数据从Channel中读取到缓冲区,通道的写入,就是把数据从缓冲区写入到Channel中。
Buffer的本质是一个内存块(数组)。与普通的内存块(数组)不同的地方是,NIO Buffer对象提供了一组更加有效的读取写入的方法(可以交替读写)。
Buffer类是一个非线程安全的类。
有8中Buffer。ByteBuffer(使用最多),CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,LongBuffer,ShortBuffer,MappedByteBuffer。
Buffer类
内部有一个byte[]数组内存块,作为缓冲区。
三个重要属性:capacity(容量),position(读写位置),limit(读写限制)
标记属性:mark,可以将当前position临时存入mark中,需要的时候再恢复。
capacity:指的是写入对象的数量,不是byte[]的大小。比如写入的是IntBuffer,capacity是10,那么只能写入10个对象。
position是当前指针的位置。写入时候,position从0开始,最大值是limit,limit会被置为capacity大小。
当需要读取写入的Buffer时候,调用flip(),把写模式转换成读模式,这个时候,limit会被置为position,表示可以被读取的最大值,接着position会被重置为0,即为通道的开头了。
Buffer类几个重要方法
- allocate()创建缓冲区
intBuffer = IntBuffer.allocate(20);
- 创建后的intBuffer,处于写入模式,大小为20*4个字节
- put(),写入到缓冲区,
intBuffer.put(8);
- flip(),翻转模式;写模式翻转到读模式,会清楚mark中的值,每个模式下应该使用自己的mark,否则会混乱啊
- 读模式返回写模式:
intBuffer.clear()或者intBuffer.compact()
- get()
- rewind() 倒带,可以重复读,主要调整position的位置,mark被清除。
- mark()和reset(),mark()讲当前的position放入mark,reset是把mark恢复到position位置。
使用Buffer类基本步骤:
- 创建子类实例对象:使用allocate()
- 调用put
- 调用flip()
- 调用get()
- 调用clear()或者compact()
Channel类
一个Channel对于一个底层的文件描述符。还可以对应某个网络传输协议,Java中实现了几个,比如FileChannel(文件IO),SocketChannel(TCP连接),ServerSocketChannel(TCP连接监听),DatagramChannel(对于UDP)。
FileChannel
-
专门操作文件的
-
阻塞模式,不可设置非阻塞模式
-
文件获取,读取,写入,关闭
-
获取
String scrFile = "D:\\ok.txt"; try { FileInputStream fis = new FileInputStream(scrFile); FileChannel channel = fis.getChannel(); } catch (FileNotFoundException e) { e.printStackTrace(); } try { FileOutputStream fileOutputStream = new FileOutputStream(scrFile); FileChannel channel = fileOutputStream.getChannel(); } catch (FileNotFoundException e) { e.printStackTrace(); } try { RandomAccessFile rw = new RandomAccessFile(scrFile, "rw"); FileChannel channel = rw.getChannel(); } catch (FileNotFoundException e) { e.printStackTrace(); }
-
读取
从通道中读取数据,读取的数据是要写入buffer中的,所以:
channel.read(byteBuffer);是把通道中的数据以ByteBuffer的格式写入到通道中的。
-
写入通道
buf.flip();处于读模式 channel.write(buf);这个方法是把buffer中的数据写入通道,此时buffer应该处于读状态。返回值是写入通道的字节数,长度。
-
关闭通道
channel.close();
-
强制刷新到磁盘
channel.fore(true);
SocketChannel与ServerSocketChannel
客户端:socketChannel
服务端:SocketChannel和ServerSocketChannel
都支持阻塞和非阻塞模式。
socketChannel.configureBlocking(false|true);
非阻塞模式下的通道操作是异步的。
-
获取SocketChannel
SocketChannel sc = SocketChannel.open();静态获取 sc.configureBlocking(false);设置非阻塞 sc.connect(new InetSocketAddress("127.0.0.1",8080));发起连接
由于非阻塞情况下,可能连接还没有建立,connect方法就返回了。
while(!sc.finishConnect()){ .... }
-
读取与写入
读取通道的内容,写入到buf中,与上面一致。
-
关闭通道
sc.shutdownOutput();
-
DatagramChannel
DatagramChannel.open();//创建一个udp通道
datagramChannel.configureBlocking(false);//设置为非阻塞
channel.socket().bind(new InetSocketAddress(8090));//绑定upd端口
读取数据使用receive()方法。channel.receive(buf);返回的事SocketAddress类型
写入。使用send(buf, new InetSocketAddress(IP,Port));
关闭。channel.close();
selector类
selector的使命是完成IO的多路复用。
一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO状况。选择器和通道是监控和被监控的关系。
一个单线程处理一个selector,一个selector可以监控多个Channel。
Channel和Selector之间的关系是通过注册的方式完成的。
Channel.register(Selector sel, int ops)方法可以把一个通道实例注册到一个Selector上。第一个参数是Selector实例,第二参数是Selector需要监控的IO事件类型。
IO事件类型:
- 可读:SelectionKey.OP_READ
- 可写:SelectionKey.OP_WRITE
- 连接:SelectionKey.OP_CONNECT
- 接收:SelectionKey.OP_ACCEPT
如果监控的事件是上面四种的组合,可以用“按位或”运算符来实现。
int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
IO操作不是对通道的IO操作,而是通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件。
比如:某个SocketChannel通道,完成了和对端的握手连接,那么该通道则处于“连接就绪”(OP_CONNECT)的状态。
通道是否可以监控
FileChannel不可以监控。能不能被监控(也就是被选择),要看该Channel是否集成SelectableChannel类。
SelectableChannel类实现了通道的可选择性所需要的的公共方法。
选择键SelectionKey
通道和选择器通过注册完成监控与被监控的关系后,就可以选择就绪事件。怎么选择呢?调用选择器的select();方法完成。
选择键:就是那些被选择器选中的IO事件(这些事件是之前被注册到Selector上的)。选择键集合中包含的都是被选择中的且注册过的IO事件。
选择键的功能强大:通过SelectionKey可以获得通道的IO事件类型,可以获得发生IO事件所在的通道,还可获得选出该选择键的选择器的实例。
选择器使用流程
- 1、获取选择器实例
- 2、把通道注册到选择器中
- 3、轮询感兴趣的IO就绪事件(选择键集合)
1、Selector selector = Selector.open();
2、将通道注册到选择器中
//获取通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//设置非阻塞
ssc.configureBlocking(false);
//绑定链接
ssc.bind(new InetSocketAddress(port));
//注册到选择器上,并制定监听事件为“接收连接”事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
3、选择出感兴趣的IO就绪事件
//轮询,选择感兴趣的IO就绪事件
while(selector.select() > 0){
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
//根据IO事件类型进行业务处理
if(key.isAcceptable()){
...
}else if(){
;;;;
}
//处理完后移除key
keyIterator.remove();
}
}
select()有多个重载版本。
select();//阻塞调用,一直到至少有一个通道发生了注册的IO事件
select(long timeout);//指定超时时间,同上
selectNow();//非阻塞,不管有没有IO事件,都立即返回
select();//返回值是int,表示发生了IO事件的通道数量。上次select到本次select之间发生的数量。
反应器模式(reactor模式)
反应器模式由两大角色组成:
- 1、Reactor反应器线程:负责响应IO事件,并且分发到Handlers处理器
- 2、handlers处理器:非阻塞当然执行业务处理逻辑
单线程的Reactor
reactor反应器模式类似于事件驱动模型。
当事件触发时,事件源会将事件dispatch分发到handler处理器进行处理。这个reactor的角色就类似于dispatch事件分发器。
多线程的Reactor
产生多个子反应器,和多个selector。每个线程负责一个选择器的查询和选择。
handler用到线程池。
java中的异步回调
join异步阻塞
join操作原理:阻塞当前进程,直到准备合并的目标线程执行完成。
主线程A,B线程,C线程。
A开始运行--> new B();new C(); --> B.start(); C.start();
接着A调用了B.join(); 有调用了C.join(); A 继续其他的事情。
分析:A从开始调用B.jpin();的时候,A就卡主了(阻塞了),需要B完成后,A才会继续执行C.join();这个时候,A又卡主了。C.join();执行完后,A才会继续执行。而且,B.join();C.join();执行是被合并到A线程里的,A却无法知道B,C的执行结果,不能获取B,C的返回值。
FutureTask类可以。
FutureTask异步回调之重武器
先说一下Runnable接口和Callable接口的区别。Runnable接口的run();方法没有返回值,Callable的call()有返回值,返回值是一个泛型。
FutureTask的构造函数需要传入一个Callable类型的实例,它有一个实例方法get来获取Callable的结果。
线程只有一个类Thread。那这个Callable的实例怎么执行run方法呢?
那就是FutureTask类的作用。FutureTask类内部有一个run方法。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable;//在FutureTask实例化的时候传入的Callable实例
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//执行实例的call方法,结果存储到result上
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);//这里把call()的结果设置到FutureTask类的成员outcome中去,可以方便get到
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
这里有个事,FutureTask的实例的get方法还是阻塞的,要等call执行完才能get到,不然就会一直等待(也可以设置超时时间)。所以这个FutureTask的get方法还是异步阻塞的。谷歌的Guava框架实现了异步非阻塞调用获取线程执行结果。
Guava
继承了Java的Future接口。对异步回调机制做了增强
- 引入新的接口:ListenableFuture。使得Java的Future异步任务在Guava中能背监控和获得非阻塞异步执行的结果.
- ListenableFuture
extends Future - 增加一个方法:addListener(Runnable r, Executor e);作用是把FutureCallback的回调工作封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行onSuccess或者onFailure处理。
- ListenableFuture
- 引入新的接口:FutureCallback。独立的新接口。在异步任务完成后,根据结果,完成不同的回调处理,并且可以处理异步结果。
- 异步任务成功后,回调onSuccess(V result);
- 异步任务失败后,回调onFailure(Throwable var);
Guava异步回调流程
-
1、实现Java的Callable接口,创建异步执行逻辑(如果不需要返回值,那就实现Runnable接口)
-
2、创建Guava线程池
-
3、把第一步创建的Callable/Runnable异步执行逻辑的实例通过submit提交给Guava线程池,从而获得ListenableFuture异步任务实例
-
4、创建FutureCallback回调实例,通过FutureCallback讲回调实例绑定到ListenFuture异步任务上
package com.stat;
import com.google.common.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class GuavaDemo {
public static final int SLEEP_GAP = 500;
public static String getCurThreadNmae() {
return Thread.currentThread().getName();
}
//业务逻辑烧水
static class HotWaterJob implements Callable {
@Override
public Boolean call() throws Exception {
System.out.println("shaoshui....1秒");
Thread.sleep(SLEEP_GAP);
return true;
}
}
static class WashJbo implements Callable {
@Override
public Boolean call() throws Exception {
System.out.println("洗杯子。。。1秒");
return true;
}
}
//新建一个一步业务类型,作为泡茶喝的主线程类
static class MainJbob implements Runnable {
boolean waterok = false;
boolean cupok = false;
int gap = SLEEP_GAP / 10;
@Override
public void run() {
while (true) {
try {
Thread.sleep(gap);
System.out.println("Doing other things");
} catch (InterruptedException e) {
e.printStackTrace();
}
if (waterok && cupok) {
drinkTea(waterok, cupok);//都准备好了,开始泡茶喝
System.out.println("喝上了茶了。。。。");
}
}
}
public void drinkTea(Boolean waterok, Boolean cupok) {
if (waterok && cupok) {
System.out.println("泡茶喝、。。。");
this.waterok = false;
this.cupok = false;
} else if (!waterok) {
System.out.println("烧水失败");
} else if (!cupok) {
System.out.println("被子未准备ok。");
}
}
}
public static void main(String[] args) {
final MainJbob mainJbob = new MainJbob();
Thread mainThread = new Thread(mainJbob);
mainThread.setName("主线程");
mainThread.start();
HotWaterJob hotWaterJob = new HotWaterJob();
WashJbo washJbo = new WashJbo();
//创建java线程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
//包装java线程池,狗仔Guava线程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
ListenableFuture hotFuture = gPool.submit(hotWaterJob);
Futures.addCallback(hotFuture, new FutureCallback() {
@Override
public void onSuccess(Boolean aBoolean) {
if (aBoolean) {
mainJbob.waterok = true;
}
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("烧水失败了。。。。。。。1111");
}
});
ListenableFuture washFuture = gPool.submit(washJbo);
Futures.addCallback(washFuture, new FutureCallback() {
@Override
public void onSuccess(Boolean aBoolean) {
if (aBoolean) {
mainJbob.cupok = true;
}
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("洗杯子失败111111111111111");
}
});
}
}