Netty源码分析之Reactor线程模型详解
上一篇文章,分析了Netty服务端启动的初始化过程,今天我们来分析一下Netty中的Reactor线程模型
在分析源码之前,我们先分析,哪些地方用到了EventLoop?
- NioServerSocketChannel的连接监听注册
- NioSocketChannel的IO事件注册
NioServerSocketChannel连接监听
在AbstractBootstrap类的initAndRegister()方法中,当NioServerSocketChannel初始化完成后,会调用case
标记位置的代码进行注册。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
}
//注册到boss线程的selector上。
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
AbstractNioChannel.doRegister
按照代码的执行逻辑,最终会执行到AbstractNioChannel的doRegister()
方法中。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//调用ServerSocketChannel的register方法,把当前服务端对象注册到boss线程的selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
NioEventLoop的启动过程
NioEventLoop是一个线程,它的启动过程如下。
在AbstractBootstrap的doBind0方法中,获取了NioServerSocketChannel中的NioEventLoop,然后使用它来执行绑定端口的任务。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//启动
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
SingleThreadEventExecutor.execute
然后一路执行到SingleThreadEventExecutor.execute方法中,调用startThread()
方法启动线程。
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread(); //启动线程
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
startThread
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread(); //执行启动过程
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
接着调用doStartThread()方法,通过executor.execute
执行一个任务,在该任务中启动了NioEventLoop线程
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() { //通过线程池执行一个任务
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); //调用boss的NioEventLoop的run方法,开启轮询
}
//省略....
}
});
}
NioEventLoop的轮询过程
当NioEventLoop线程被启动后,就直接进入到NioEventLoop的run方法中。
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
NioEventLoop的执行流程
NioEventLoop中的run方法是一个无限循环的线程,在该循环中主要做三件事情,如图9-1所示。
- 轮询处理I/O事件(select),轮询Selector选择器中已经注册的所有Channel的I/O就绪事件
- 处理I/O事件,如果存在已经就绪的Channel的I/O事件,则调用
processSelectedKeys
进行处理 - 处理异步任务(runAllTasks),Reactor线程有一个非常重要的职责,就是处理任务队列中的非I/O任务,Netty提供了ioRadio参数用来调整I/O时间和任务处理的时间比例。
轮询I/O就绪事件
我们先来看I/O时间相关的代码片段:
- 通过
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
获取当前的执行策略 - 根据不同的策略,用来控制每次轮询时的执行策略。
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
}
//省略....
}
}
}
selectStrategy处理逻辑
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
如果hasTasks
为true,表示当前NioEventLoop线程存在异步任务的情况下,则调用selectSupplier.get()
,否则直接返回SELECT
。
其中selectSupplier.get()
的定义如下:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
该方法中调用的是selectNow()
方法,这个方法是Selector选择器中的提供的非阻塞方法,执行后会立刻返回。
- 如果当前已经有就绪的Channel,则会返回对应就绪Channel的数量
- 否则,返回0.
分支处理
在上面一个步骤中获得了strategy之后,会根据不同的结果进行分支处理。
- CONTINUE,表示需要重试。
- BUSY_WAIT,由于在NIO中并不支持BUSY_WAIT,所以BUSY_WAIT和SELECT的执行逻辑是一样的
- SELECT,表示需要通过select方法获取就绪的Channel列表,当NioEventLoop中不存在异步任务时,也就是任务队列为空,则返回该策略。
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
SelectStrategy.SELECT
当NioEventLoop线程中不存在异步任务时,则开始执行SELECT策略
//下一次定时任务触发截至时间,默认不是定时任务,返回 -1L
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//2. taskQueue中任务执行完,开始执行select进行阻塞
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
select方法定义如下,默认情况下deadlineNanos=NONE
,所以会调用select()
方法阻塞。
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
//计算select()方法的阻塞超时时间
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
最终返回就绪的channel个数,后续的逻辑中会根据返回的就绪channel个数来决定执行逻辑。
NioEventLoop.run中的业务处理
业务处理的逻辑相对来说比较容易理解
- 如果有就绪的channel,则处理就绪channel的IO事件
- 处理完成后同步执行异步队列中的任务。
- 另外,这里为了解决Java NIO中的空转问题,通过selectCnt记录了空转次数,一次循环发生了空转(既没有IO需要处理、也没有执行任何任务),那么记录下来(selectCnt); ,如果连续发生空转(selectCnt达到一定值),netty认为触发了NIO的BUG(unexpectedSelectorWakeup处理);
Java Nio中有一个bug,Java nio在Linux系统下的epoll空轮询问题。也就是在
select()
方法中,及时就绪的channel为0,也会从本来应该阻塞的操作中被唤醒,从而导致CPU 使用率达到100%。
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
//省略....
selectCnt++;//selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) { //ioRadio执行时间占比是100%,默认是50%
try {
if (strategy > 0) { //strategy>0表示存在就绪的SocketChannel
processSelectedKeys(); //执行就绪SocketChannel的任务
}
} finally {
//注意,将ioRatio设置为100,并不代表任务不执行,反而是每次将任务队列执行完
ranTasks = runAllTasks(); //确保总是执行队列中的任务
}
} else if (strategy > 0) { //strategy>0表示存在就绪的SocketChannel
final long ioStartTime = System.nanoTime(); //io时间处理开始时间
try {
processSelectedKeys(); //开始处理IO就绪事件
} finally {
// io事件执行结束时间
final long ioTime = System.nanoTime() - ioStartTime;
//基于本次循环处理IO的时间,ioRatio,计算出执行任务耗时的上限,也就是只允许处理多长时间异步任务
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//这个分支代表:strategy=0,ioRatio<100,此时任务限时=0,意为:尽量少地执行异步任务
//这个分支和strategy>0实际是一码事,代码简化了一下而已
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) { //ranTasks=true,或strategy>0,说明eventLoop干活了,没有空转,清空selectCnt
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
}
//unexpectedSelectorWakeup处理NIO BUG
else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
}
}
processSelectedKeys
通过在select
方法中,我们可以获得就绪的I/O事件数量,从而触发执行processSelectedKeys
方法。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
处理I/O事件时,有两个逻辑分支处理:
- 一种是处理Netty优化过的selectedKeys,
- 另一种是正常的处理逻辑
processSelectedKeys方法中根据是否设置了selectedKeys
来判断使用哪种策略,默认使用的是Netty优化过的selectedKeys,它返回的对象是SelectedSelectionKeySet
。
processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
//1. 取出IO事件以及对应的channel
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;//k的引用置null,便于gc回收,也表示该channel的事件处理完成避免重复处理
final Object a = k.attachment(); //获取保存在当前channel中的attachment,此时应该是NioServerSocketChannel
//处理当前的channel
if (a instanceof AbstractNioChannel) {
//对于boss NioEventLoop,轮询到的基本是连接事件,后续的事情就是通过他的pipeline将连接扔给一个worker NioEventLoop处理
//对于worker NioEventLoop来说,轮循道的基本商是IO读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask task = (NioTask) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
}
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps(); //获取当前key所属的操作类型
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果是连接类型
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) { //如果是写类型
ch.unsafe().forceFlush();
}
//如果是读类型或者ACCEPT类型。则执行unsafe.read()方法,unsafe的实例对象为 NioMessageUnsafe
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
NioMessageUnsafe.read()
假设此时是一个读操作,或者是客户端建立连接,那么代码执行逻辑如下,
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline(); //如果是第一次建立连接,此时的pipeline是ServerBootstrapAcceptor
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); //调用pipeline中的channelRead方法
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception); //调用pipeline中的ExceptionCaught方法
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
SelectedSelectionKeySet的优化
Netty中自己封装实现了一个SelectedSelectionKeySet,用来优化原本SelectorKeys的结构,它是怎么进行优化的呢?先来看它的代码定义
final class SelectedSelectionKeySet extends AbstractSet {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
}
SelectedSelectionKeySet内部使用的是SelectionKey数组,所有在processSelectedKeysOptimized方法中可以直接通过遍历数组来取出就绪的I/O事件。
而原来的Set
返回的是HashSet类型,两者相比,SelectionKey[]不需要考虑哈希冲突的问题,所以可以实现O(1)时间复杂度的add操作。
SelectedSelectionKeySet的初始化
netty通过反射的方式,把Selector对象内部的selectedKeys和publicSelectedKeys替换为SelectedSelectionKeySet。
原本的selectedKeys和publicSelectedKeys这两个字段都是HashSet类型,替换之后变成了SelectedSelectionKeySet。当有就绪的key时,会直接填充到SelectedSelectionKeySet的数组中。后续只需要遍历即可。
private SelectorTuple openSelector() {
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
//使用反射
Object maybeException = AccessController.doPrivileged(new PrivilegedAction