Netty的体系结构及使用
《Netty权威指南》
一、异步和事件驱动
1.Java网络编程
二、第一款Netty应用程序 1.编写 Echo 服务器"); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); // 使用的是 NIO 传输,所以指定 了NioEventLoopGroup来接受和处理新的连接 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 指定所使用的 NIO 传输 Channel .channel(NioServerSocketChannel.class) //使用指定的 端口设置套 接字地址 .localAddress(new InetSocketAddress(port)) //使用了一个特殊的类——ChannelInitializer。这是关键。当一个新的连接被接收时, // 一个新的子 Channel 将会被创建,而 ChannelInitializer 将会把一个你的 // EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中。 .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); //异步地绑定服务器; 调用 sync()方法阻塞 等待直到绑定完成 ChannelFuture f = b.bind().sync(); //获取 Channel 的 CloseFuture,并 且阻塞当前线 程直到它完成 f.channel().closeFuture().sync(); } finally { // 关闭 EventLoopGroup, 释放所有的资源 group.shutdownGracefully().sync(); } } }
在这个时候,服务器已经初始化,并且已经就绪能被使用了。这个示例使用了 NIO,因为得益于它的可扩展性和彻底的异步性,它是目前使用最广泛的传 输。但是也可以使用一个不同的传输实现。如果你想要在自己的服务器中使用 OIO 传输,将需 要指定 OioServerSocketChannel 和 OioEventLoopGroup。
让我们回顾一下你刚完成的服务器实现中的重要步骤。下面这些是服务器的主要代码组件:
() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println( "Usage: " + EchoClient.class.getSimpleName() + " "); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }
让我们回顾一下这一节中所介绍的要点:
??为初始化客户端,创建了一个 Bootstrap 实例;
??为进行事件处理分配了一个 NioEventLoopGroup 实例,其中事件处理包括创建新的 连接以及处理入站和出站数据;
??为服务器连接创建了一个 InetSocketAddress 实例;
??当连接被建立时,一个 EchoClientHandler 实例会被安装到(该 Channel 的)
ChannelPipeline 中;
??在一切都设置完成后,调用 Bootstrap.connect()方法连接到远程节点; 完成了客户端,你便可以着手构建并测试该系统了。
三、Netty3组件和设计 本章主要内容 ? Netty 的技术和体系结构方面的内容 ? Channel、EventLoop 和 ChannelFuture ? ChannelHandler 和 ChannelPipeline ? 引导
??一个 EventLoopGroup 包含一个或者多个 EventLoop;
??一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
??所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
??一个 Channel 在它的生命周期内只注册于一个 EventLoop;
??一个 EventLoop 可能会被分配给一个或多个 Channel。
注意,在这种设计中,一个给定Channel 的 I/O 操作都是由相同的 Thread 执行的,实际
上消除了对于同步的需要。
引导一个客户端只需要一个 EventLoopGroup,但是一个 ServerBootstrap 则需要两个(也可以是同一个实例)。为什么呢?
因为服务器需要两组不同的 Channel。
与 ServerChannel 相关联的 EventLoopGroup 将负责分配一个为连接请求创建 Channel 的 EventLoop。一旦连接被接受,第二个 EventLoopGroup 就会给它的 Channel 分配一个 EventLoop。
四、传输 在本章中,我们将研究: 1.Netty传输、它们的实现和使用,以及 Netty 是如何将它们呈现给开发者的。 2.深入探讨了 Netty 预置的传输,并且解释了它们的行为。 3.如何匹配不同的传输和特定用例的需求。 本章主要内容 ? OIO——阻塞传输 ? NIO——异步传输 ? Local——JVM 内部的异步通信 ? Embedded——测试你的ChannelHandler 流经网络的数据总是具有相同的类型:字节。这些字节是如何流动的主要取决于我们所说的 网络传输— 一个帮助我们抽象底层数据传输机制的概念。用户并不关心这些细节;他们只想确 保他们的字节被可靠地发送和接收。 如果你有 Java 网络编程的经验,那么你可能已经发现,在某些时候,你需要支撑比预期多 很多的并发连接。如果你随后尝试从阻塞传输切换到非阻塞传输,那么你可能会因为这两种网络 API 的截然不同而遇到问题。 然而,Netty 为它所有的传输实现提供了一个通用 API,这使得这种转换比你直接使用 JDK 所能够达到的简单得多。所产生的代码不会被实现的细节所污染,而你也不需要在你的整个代码 库上进行广泛的重构。简而言之,你可以将时间花在其他更有成效的事情上。 4.1 案例研究:传输迁移 1.不通过 Netty 使用 OIO 和 NIO /** * 代码清单 4-1 未使用 Netty 的阻塞网络编程 * * @author xuxh * @date 2021/03/07 11:27 */ public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { for (; ; ) { final Socket clientSocket = socket.accept(); System.out.println( "Accepted connection from " + clientSocket); new Thread(new Runnable() { @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes( Charset.forName("UTF-8"))); out.flush(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); } } catch ( IOException e) { e.printStackTrace(); } } } /** * 代码清单 4-2 未使用 Netty 的异步网络编程 * * @author xuxh * @date 2021/03/07 11:32 */ public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ssocket = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ssocket.bind(address); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (; ; ) { try { // 等待需要处理的新事 件;阻塞 将一直持续到 下一个传入事件 selector.select(); } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } // 获取所有接 收事件的 Selection- Key 实例 Set readyKeys = selector.selectedKeys(); Iterator iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { // 检查事件是否是一 个新的已经就绪可 以被接受的连接 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); // 接受客户端,并将它注册到选择器 client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println("Accepted connection from " + client); } // 检查套接字是否已经准备好写数据 if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { // 将数据写到已连接的客户端 if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // ignore on close } } } } } }
如同你所看到的,虽然这段代码所做的事情与之前的版本完全相同,但是代码却截然不同。 如果为了用于非阻塞 I/O 而重新实现这个简单的应用程序,都需要一次完全的重写的话,那么不 难想象,移植真正复杂的应用程序需要付出什么样的努力。
鉴于此,让我们来看看使用 Netty 实现该应用程序将会是什么样子吧。
4.1.2 通过 Netty 使用 OIO 和 NIO
/** * 代码清单 4-3 使用 Netty 的阻塞网络处理 * * @author xuxh * @date 2021/03/07 21:15 */ public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); EventLoopGroup group = new OioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 OioEventLoopGroup 以允许阻塞模式 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,对于 每个已接受的 连接都调用它 .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一个 Channel- InboundHandler- Adapter 以拦截和 处理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息写到客户端,并添 加 ChannelFutureListener, 以便消息一被写完就关闭 连接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定服务器 以接受连接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 释放所有的资源 group.shutdownGracefully().sync(); } } }
/** * 代码清单 4-4 使用 Netty 的异步网络处理 * * @author xuxh * @date 2021/03/07 21:40 */ public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); // 使用的是 NIO 传输,所以指定 了NioEventLoopGroup来接受和处理新的连接 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 NioEventLoopGroup 非阻塞模式 .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,对于 每个已接受的 连接都调用它 .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一个 Channel- InboundHandler- Adapter 以拦截和 处理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息写到客户端,并添 加 ChannelFutureListener, 以便消息一被写完就关闭 连接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定服务器 以接受连接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 释放所有的资源 group.shutdownGracefully().sync(); } } }
因为 Netty 为每种传输的实现都暴露了相同的 API,所以无论选用哪一种传输的实现,你的 代码都仍然几乎不受影响。在所有的情况下,传
4.2 传输 API
传输 API 的核心是 interface Channel,它被用于所有的 I/O 操作。Channel 类的层次结构如图 4-1 所示。
如图所示,每个 Channel 都将会被分配一个 ChannelPipeline 和 ChannelConfig。
稍后我们将进一步深入地讨论所有这些特性的应用。目前,请记住,Netty 所提供的广泛功 能只依赖于少量的接口。这意味着,你可以对你的应用程序逻辑进行重大的修改,而又无需大规 模地重构你的代码库。
Netty 的 Channel 实现是线程安全的,因此你可以存储一个到 Channel 的引用,并且每当 你需要向远程节点写数据时,都可以使用它,即使当时许多线程都在使用它。
4.3 内置的传输
Netty 内置了一些可开箱即用的传输。因为并不是它们所有的传输都支持每一种协议,所以 你必须选择一个和你的应用程序所使用的协议相容的传输。
在本节中我们将讨论这些关系。表 4-2 显示了所有 Netty 提供的传输。
4.3.1 NIO——非阻塞 I/O
NIO 提供了一个所有 I/O 操作的全异步的实现。它利用了自 NIO 子系统被引入 JDK 1.4 时便 可用的基于选择器的 API。
选择器背后的基本概念是充当一个注册表,,当 Channel 的状态发生变化时, 在选择器可以得到通知。
Channel可能的状态变化有:
??新的 Channel 已被接受并且就绪;
??Channel 连接已经完成;
??Channel 有已经就绪的可供读取的数据;
??Channel 可用于写数据。
选择器运行在一个检查状态变化并对其做出相应响应的线程上,在应用程序对状态的变化做出响应之后,选择器将会被重置,并将重复这个过程。
表 4-3 中的常量值代表了由 class java.nio.channels.SelectionKey 定义的位模式。这些位模式可以组合起来定义一组应用程序正在请求通知的状态变化集。
对于所有传输都共有的用户级别 API ,Netty完全地隐藏了这些 NIO 的内部细节。 图 4-2 展示了该处理流程。
零拷贝
零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll 传输时才可使用的特性。它使你可以快速 高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,在像 FTP 或者 HTTP 这样的协议中可以显著地提升性能。但是,并不是所有的操作系统都支持这一特性。特别地,它对于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。
4.3.2 Epoll — 用于 Linux 的本地非阻塞传输4.3.2 Epoll— 用于 Linux 的本地非阻塞传输
正如我们之前所说的,Netty 的 NIO 传输基于 Java 提供的异步/非阻塞网络编程的通用抽象。 虽然这保证了 Netty 的非阻塞 API 可以在任何平台上使用,但它也包含了相应的限制,因为 JDK 为了在所有系统上提供相同的功能,必须做出妥协。
Linux作为高性能网络编程的平台,其重要性与日俱增,这催生了大量先进特性的开发,其中包括epoll——一个高度可扩展的I/O事件通知特性。这个API自Linux内核版本 2.5.44(2002)被 引入,提供了比旧的POSIX select和poll系统调用 1更好的性能,同时现在也是Linux上非阻 塞网络编程的事实标准。Linux JDK NIO API使用了这些epoll调用。
Netty为Linux提供了一组NIO API,它以一种和它本身的设计更加一致的方式使用epoll,并且以一种更加轻量的方式使用中断。如果你的应用程序只运行于Linux系统,那么请考虑利用 这个版本的传输; 你将发现在高负载下它的性能要优于JDK的NIO实现。
这个传输的语义与在图 4-2 所示的完全相同,而且它的用法也是简单直接的。相关示例参照 代码清单 4-4。如果要在那个代码清单中使用 epoll 替代 NIO,只需要将 NioEventLoopGroup 替换为 EpollEventLoopGroup,并且将 NioServerSocketChannel.class 替换为 EpollServerSocketChannel.class 即可。
4.3.3 OIO — 旧的阻塞 I/O
Netty 的 OIO 传输实现代表了一种折中: 它可以通过常规的传输 API 使用,但是由于它是建立在 java.net 包的阻塞实现之上的,所以它不是异步的。但是,它仍然非常适合于某些用途。
有了这个背景,你可能会想,Netty是如何能够使用和用于异步传输相同的API来支持OIO的呢。 答案就是,Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操作完成的最大毫秒数。如果操作在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty 将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是类似于Netty这样的异步框架能够支持OIO的唯一方式。
这种方式的一个问题是,当一个SocketTimeoutException被抛出时填充栈跟踪所需要的时间,其对于性能来说代价很大。
图 4-3 说明了这个逻辑。
4.3.4 用于 JVM 内部通信的 Local 传输
Netty 提供了一个 Local 传输,用于在同一个 JVM 中运行的客户端和服务器程序之间的异步 通信。同样,这个传输也支持其他传输共同的API。
在这个传输中,和服务器 Channel 相关联的 SocketAddress 并没有绑定物理网络地址; 相反,只要服务器还在运行,它就会被存储在注册表里,并在 Channel 关闭时注销。因为这个 传输并不接受真正的网络流量,所以它并不能够和其他传输实现进行互操作。因此,在同一个 JVM 中,客户端希望 连接到使用了这个传输的服务器端时必须使用它。除了这个限制,它的使用方式和其他的传输一模一样。
4.3.5 Embedded 传输
Netty 提供了一种额外的传输,使得你可以将一组 ChannelHandler 作为帮助器类嵌入到 其他的 ChannelHandler 内部。通过这种方式,你将可以扩展一个 ChannelHandler 的功能, 而又不需要修改其内部代码。
不足为奇的是,Embedded 传输的关键是一个具体的 Channel 的实现:EmbeddedChannel。在第 9 章中,我们将详细地讨论如何使用这个类来为 ChannelHandler 的实现创建单元 测试用例。
4.4 传输的用例
既然我们已经详细地了解了所有的传输,那么让我们考虑一下选用一个适用于特定用途的协议的因素吧。正如前面所提到的,并不是所有的传输都支持所有的核心协议。
表 4-4 展示了截止出版时的传输和其所支持的协议。
虽然只有 SCTP 传输有这些特殊要求,但是其他传输可能也有它们自己的配置选项需要考虑。 此外,如果只是为了支持更高的并发连接数,服务器平台可能需要配置得和客户端不一样。
这里是一些你很可能会遇到的用例。
??非阻塞代码库——如果你的代码库中没有阻塞调用(或者你能够限制它们的范围),那么在 Linux 上使用 NIO 或者 epoll 始终是个好主意。虽然 NIO/epoll 旨在处理大量的并发连 接,但是在处理较小数目的并发连接时,它也能很好地工作,尤其是考虑到它在连接之 间共享线程的方式。
??阻塞代码库——正如我们已经指出的,如果你的代码库严重地依赖于阻塞 I/O,而且你的应 用程序也有一个相应的设计,那么在你尝试将其直接转换为 Netty 的 NIO 传输时,你将可 能会遇到和阻塞操作相关的问题。不要为此而重写你的代码,可以考虑分阶段迁移:先从 OIO 开始,等你的代码修改好之后,再迁移到 NIO(或者使用 epoll,如果你在使用 Linux)。
??在同一个 JVM 内部的通信——在同一个 JVM 内部的通信,不需要通过网络暴露服务,是 Local 传输的完美用例。这将消除所有真实网络操作的开销,同时仍然使用你的 Netty 代码 库。如果随后需要通过网络暴露服务,那么你将只需要把传输改为 NIO 或者 OIO 即可。
??测试你的 ChannelHandler 实现——如果你想要为自己的 ChannelHandler 实现编 写单元测试,那么请考虑使用 Embedded 传输。这既便于测试你的代码,而又不需要创建大 量的模拟(mock)对象。你的类将仍然符合常规的 API 事件流,保证该 ChannelHandler 在和真实的传输一起使用时能够正确地工作。
表 4-5 总结了我们探讨过的用例。
5. ByteBuf 本章主要内容 本章专门探讨了 Netty 的基于 ByteBuf 的数据容器。我们讨论过的要点有: ??使用不同的读索引和写索引来控制数据访问; ??使用内存的不同方式——基于字节数组和直接缓冲区; ??通过 CompositeByteBuf 生成多个 ByteBuf 的聚合视图; ??数据访问方法——搜索、切片以及复制; ??读、写、获取和设置 API; ??内存分配:ByteBufAllocator 池化和引用计数。 网络数据的基本单位总是字节。Java NIO 提供了 ByteBuffer 作为它 的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。 Netty 的 ByteBuffer 替代品是 ByteBuf,一个强大的实现,既解决了 JDK API 的局限性, 又为网络应用程序的开发者提供了更好的 API。 5.1 ByteBuf 的 API Netty 的数据处理 API 通过两个组件暴露——abstract class ByteBuf 和 interface ByteBufHolder。 下面是一些 ByteBuf API 的优点: ??它可以被用户自定义的缓冲区类型扩展; ??通过内置的复合缓冲区类型实现了透明的零拷贝; ??容量可以按需增长(类似于 JDK 的 StringBuilder); ??在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法; ??读和写使用了不同的索引; ??支持方法的链式调用; ??支持引用计数; ??支持池化。 5.2 ByteBuf 类——Netty 的数据容器 5.2.1 它是如何工作的 ByteBuf 维护了两个不同的索引:一个用于读取,一个用于写入。当你从 ByteBuf 读取时, 它的 readerIndex 将会被递增已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的 writerIndex 也会被递增。图 5-1 展示了一个空 ByteBuf 的布局结构和状态。
要了解这些索引两两之间的关系,请考虑一下,如当 readerIndex 达到 和 writerIndex 同样的值时会发生什么。在那时,你将会到达“可以读取的”数据的末尾。就 如同试图读取超出数组末尾的数据一样,试图读取超出该点的数据将会触发一个 IndexOutOf- BoundsException。
名称以 read 或者 write 开头的 ByteBuf 方法,将会推进其对应的索引,而名称以 set 或 者 get 开头的操作则不会。可以指定 ByteBuf 的最大容量。试图移动写索引(即 writerIndex)超过这个值将会触发一个异常1。(默认的限制是 Integer.MAX_VALUE。)
5.2.2 ByteBuf 的使用模式
1.堆缓冲区
最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。这种模式被称为支撑数组 (backing array),它能在没有使用池化的情况下提供快速的分配和释放。这种方式非常适合于有遗留的数据需要处理的情况,如代码清单5-1 所示。
2.直接缓冲区
直接缓冲区是另外一种 ByteBuf 模式。NIO 在 JDK 1.4 中引入的 ByteBuffer 类允许 JVM 实现通过本地调用来分配内存。
这主要是为了避免在每次调用本地 I/O 操作之前(或者之后)将缓冲区的内容复制到一个中间缓冲区,或者从中间缓冲区把内容复制到缓冲区。
ByteBuffer的Javadoc1明确指出:“直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外。”这也就解释了为何直接缓冲区对于网络数据传输是理想的选择。
如果你的数据包含在一 个在堆上分配的缓冲区中,事实上,在通过套接字发送它之前,JVM将会在内部把你的缓冲 区复制到一个直接缓冲区中。
直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。
如果你 正在处理遗留代码,你也可能会遇到另外一个缺点:因为数据不是在堆上,所以你不得不进行一 次复制,如代码清单 5-2 所示。
显然,与使用支撑数组相比,这涉及的工作更多。因此,如果事先知道容器中的数据将会被作为数组来访问,使用堆内存更合适。
3.复合缓冲区
第三种也是最后一种模式使用的是复合缓冲区,它为多个 ByteBuf 提供一个聚合视图。在 这里可以根据需要添加或者删除 ByteBuf 实例,这是一个 JDK 的 ByteBuffer 实现完全缺失的特性。
Netty 通过一个 ByteBuf 子类——CompositeByteBuf——实现了这个模式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。
为了举例说明,让我们考虑一下一个由两部分——头部和主体——组成的将通过 HTTP 协议 传输的消息。这两部分由应用程序的不同模块产生,将会在消息被发送的时候组装。该应用程序 可以选择为多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新 的头部。
因为我们不想为每个消息都重新分配这两个缓冲区,所以使用 CompositeByteBuf 是一个 完美的选择。它在消除了没必要的复制的同时,暴露了通用的 ByteBuf API。
图 5-2 展示了生成 的消息布局。
警告
CompositeByteBuf中的ByteBuf实例可能同时包含直接内存分配和非直接内存分配。 如果其中只有一个实例,那么对 CompositeByteBuf 上的 hasArray()方法的调用将返回该组 件上的 hasArray()方法的值;否则它将返回 false。
需要注意的是,Netty使用了CompositeByteBuf来优化套接字的I/O操作,尽可能地消除了 由JDK的缓冲区实现所导致的性能以及内存使用率的惩罚。1这种优化发生在Netty的核心代码中, 因此不会被暴露出来,但是你应该知道它所带来的影响。
5.3 字节级操作
ByteBuf 提供了许多超出基本读、写操作的方法用于修改它的数据。在接下来我们将会讨论这些中最重要的部分。
5.3.1 随机访问索引
如同在普通的 Java 字节数组中一样,ByteBuf 的索引是从零开始的:第一个字节的索引是 0,最后一个字节的索引总是 capacity() - 1。
代码清单 5-6 表明,对存储机制的封装使得遍 历 ByteBuf 的内容非常简单。
代码清单 5-6 访问数据
ByteBuf buffer = ...; for (int i = 0; i < buffer.capacity(); i++) { byte b = buffer.getByte(i); System.out.println((char)b); }
需要注意的是,使用那些需要一个索引值参数的方法(的其中)之一来访问数据既不会改变 readerIndex 也不会改变 writerIndex。
5.3.2 顺序访问索引
虽然 ByteBuf 同时具有读索引和写索引,但是 JDK 的 ByteBuffer 却只有一个索引,这 也就是为什么必须调用 flip()方法来在读模式和写模式之间进行切换的原因。图 5-3 展示了 ByteBuf 是如何被它的两个索引划分成 3 个区域的。
5.3.3 可丢弃字节
在图 5-3 中标记为可丢弃字节的分段包含了已经被读过的字节。通过调用 discardRead- Bytes()方法,可以丢弃它们并回收空间。这个分段的初始大小为 0,存储在 readerIndex 中, 会随着 read 操作的执行而增加(get*操作不会移动 readerIndex)。
图 5-4 展示了图 5-3 中所展示的缓冲区上调用discardReadBytes()方法后的结果。可以看 到,可丢弃字节分段中的空间已经变为可写的了。注意,在调用discardReadBytes()之后,对 可写分段的内容并没有任何的保证。因为只是移动了可以读取的字节以及writerIndex,而没有对所有可写入的字节进行擦除写。
虽然你可能会倾向于频繁地调用 discardReadBytes()方法以确保可写分段的最大化,但是 请注意,这将极有可能会导致内存复制,因为可读字节(图中标记为 CONTENT 的部分)必须被移 动到缓冲区的开始位置。我们建议只在有真正需要的时候才这样做,例如,当内存非常宝贵的时候。
5.3.4 可读字节
和之前一样,ByteBuf 包含 3 个分段。图 5-6 展示了在 clear()方法被调用之后 ByteBuf 的状态。
5.3.7 查找操作
在ByteBuf中有多种可以用来确定指定值的索引的方法。最简单的是使用indexOf()方法。 较复杂的查找可以通过那些需要一个ByteBufProcessor1作为参数的方法达成。这个接口只定 义了一个方法:
boolean process(byte value)
它将检查输入值是否是正在查找的值。 ByteBufProcessor针对一些常见的值定义了许多便利的方法。假设你的应用程序需要和
所谓的包含有以NULL结尾的内容的Flash套接字 2集成。调用 forEachByte(ByteBufProcessor.FIND_NUL)
将简单高效地消费该 Flash 数据,因为在处理期间只会执行较少的边界检查。
5.3.8 派生缓冲区
派生缓冲区为 ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方 法被创建的:
??duplicate();
??slice();
??slice(int, int);
??Unpooled.unmodifiableBuffer(...);
??order(ByteOrder);
??readSlice(int)。
每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记
索引。其内部存储和 JDK 的 ByteBuffer 一样也是共享的。这使得派生缓冲区的创建成本 是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所 以要小心。
ByteBuf复制
如果需要一个现有缓冲区的真实副本,请使用 copy()或者 copy(int, int)方 法。不同于派生缓冲区,由这个调用所返回的 ByteBuf 拥有独立的数据副本。
5.3.9 读/写操作
正如我们所提到过的,有两种类别的读/写操作:
??get()和 set()操作,从给定的索引开始,并且保持索引不变;
??read()和 write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整。
表 5-1 列举了最常用的 get()方法。完整列表请参考对应的 API 文档。
大多数的这些操作都有一个对应的 set()方法。这些方法在表 5-2 中列出。
代码清单 5-12 说明了 get()和 set()方法的用法,表明了它们不会改变读索引和写索引。
现在,让我们研究一下 read()操作,其作用于当前的 readerIndex 或 writerIndex。 这些方法将用于从 ByteBuf 中读取数据,如同它是一个流。表 5-3 展示了最常用的方法。
几乎每个 read()方法都有对应的 write()方法,用于将数据追加到 ByteBuf 中。注意,表 5-4 中所列出的这些方法的参数是需要写入的值,而不是索引值。
5.3.10 更多的操作
表5-5 列举了由ByteBuf提供的其他有用操作。
表 5-5 其他有用的操作
名称 描述
isReadable() 如果至少有一个字节可供读取,则返回 true
isWritable() 如果至少有一个字节可被写入,则返回 true
readableBytes() 返回可被读取的字节数
writableBytes() 返回可被写入的字节数
capacity() 返回 ByteBuf 可容纳的字节数。在此之后,它会尝试再次扩展直 到达到 maxCapacity()
maxCapacity() 返回 ByteBuf 可以容纳的最大字节数
hasArray() 如果 ByteBuf 由一个字节数组支撑,则返回 true
array() 如果 ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个 UnsupportedOperationException 异常
5.4 ByteBufHolder 接口
我们经常发现,除了实际的数据负载之外,我们还需要存储各种属性值。HTTP 响应便是一 个很好的例子,除了表示为字节的内容,还包括状态码、cookie 等。
为了处理这种常见的用例,Netty 提供了 ByteBufHolder。ByteBufHolder 也为 Netty 的 高级特性提供了支持,如缓冲区池化,其中可以从池中借用 ByteBuf,并且在需要时自动释放。
ByteBufHolder 只有几种用于访问底层数据和引用计数的方法 。表 5-6 列出了它们(这里不包括它继承自 ReferenceCounted 的那些方法)。
表5-6 ByteBufHolder的操作
名称 描述
content() 返回由这个 ByteBufHolder 所持有的 ByteBuf
copy() 返回这个 ByteBufHolder 的一个深拷贝,包括一个其所包含的 ByteBuf 的非共享拷贝
duplicate() 返回这个ByteBufHolder的一个浅拷贝,包括一个其所包含的ByteBuf的共享拷贝
5.5 ByteBuf 分配
在这一节中,我们将描述管理 ByteBuf 实例的不同方式。
5.5.1 按需分配:ByteBufAllocator 接口
为了降低分配和释放内存的开销,Netty 通过 interface ByteBufAllocator 实现了 (ByteBuf 的)池化,它可以用来分配我们所描述过的任意类型的 ByteBuf 实例。
使用池化是特定于应用程序的决定,其并不会以任何方式改变 ByteBuf API(的语义)。
表5-7 列出了ByteBufAllocator提供的一些操作。
可以通过 Channel(每个都可以有一个不同的 ByteBufAllocator 实例)或者绑定到 ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用。
代码清单 5-14 说明了这两种方法。
Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); .... ChannelHandlerContext ctx = ...; ByteBufAllocator allocator2 = ctx.alloc();
Netty提供了两种ByteBufAllocator的实现: PooledByteBufAllocator和UnpooledByteBufAllocator。
前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。
后者的实现不池化ByteBuf实例,并且在每次它被调用时都会返回一个新的实例。
虽然Netty4.1.x默认使用了PooledByteBufAllocator,但这可以通过ChannelConfig API或者在引导应用程序时指定不同的分配器来更改。
5.5.2 Unpooled 缓冲区
可能某些情况下,你未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提 供了一个简单的称为 Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的 ByteBuf 实例。表 5-8 列举了这些中最重要的方法。
5.5.3 ByteBufUtil 类
ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。
这些静态方法中最有价值的可能就是 hexdump()方法,它以十六进制的表示形式打印 ByteBuf 的内容。这在各种情况下都很有用,例如,打印调试ByteBuf 的内容。
另一个有用的方法是 boolean equals(ByteBuf, ByteBuf),它被用来判断两个 ByteBuf 实例的相等性。
5.6 引用计数
引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。
引用计数对于池化实现(如 PooledByteBufAllocator)来说是至关重要的,它降低了 内存分配的开销。
试图访问一个已经被释放的引用计数的对象,将会导致一个 IllegalReferenceCount- Exception。
注意,一个特定的(ReferenceCounted 的实现)类,可以用它自己的独特方式来定义它 的引用计数规则。例如,我们可以设想一个类,其 release()方法的实现总是将引用计数设为 零,而不用关心它的当前值,从而一次性地使所有的活动引用都失效。
谁负责释放
一般来说,是由最后访问(引用计数)对象的那一方来负责将它释放。在第 6 章中, 我们将会解释这个概念和 ChannelHandler 以及 ChannelPipeline 的相关性。
第 6 章 ChannelHandler 和ChannelPipeline 本章中,我们将专注于 ChannelHandler,它为你的数据处理逻辑提供了载体。 因为ChannelHandler 大量地使用了 ByteBuf,你将开始看到 Netty 的整体架构的各个重要部分最 终走到了一起。 本章主要内容 ? ChannelHandler API 和 ChannelPipeline API ? 检测资源泄漏 ? 异常处理 6.1 ChannelHandler 家族 6.1.1 Channel 的生命周期 Interface Channel 定义了一组和 ChannelInboundHandler API 密切相关的简单但 功能强大的状态模型,表 6-1 列出了 Channel 的这 4 个状态。 表6-1 Channel的生命周期状态 状态 描述 ChannelUnregistered Channel 已经被创建,但还未注册到 EventLoop ChannelRegistered Channel 已经被注册到了 EventLoop ChannelActive Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 ChannelInactive Channel 没有连接到远程节点 Channel 的正常生命周期如图 6-1 所示。当这些状态发生改变时,将会生成对应的事件。 这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。
6.1.2 ChannelHandler 的生命周期
表 6-2 中列出了 interface ChannelHandler 定义的生命周期操作,在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操作。这些 方法中的每一个都接受一个 ChannelHandlerContext 参数。表6-2 ChannelHandler的生命周期方法
类型 描述
handlerAdded 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
handlerRemoved 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
exceptionCaught 当处理过程中在 ChannelPipeline 中有错误产生时被调用
Netty 定义了下面两个重要的 ChannelHandler 子接口:
??ChannelInboundHandler——处理入站数据以及各种状态变化;
??ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。
6.1.3 ChannelInboundHandler 接口
表 6-3 列出了 interface ChannelInboundHandler 的生命周期方法。这些方法将会在数据被接收时或者与其对应的 Channel 状态发生改变时被调用。正如我们前面所提到的,这些 方法和 Channel 的生命周期密切相关。
表6-3 ChannelInboundHandler的方法
类型 描述
channelRegistered 当 Channel已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
channelUnregistered 当 Channel从它的 EventLoop 注销并且无法处理任何 I/O 时被调用
channelActive 当 Channel处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
channelInactive 当 Channel离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete 当Channel上的一个读操作完成时被调用 1
channelRead 当从 Channel 读取数据时被调用
ChannelWritability- Changed 当Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生 OutOfMemoryError)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用Channel的isWritable()方法来检测 Channel 的可写性。与可写性相关的阈值可以通过 Channel.config(). setWriteHighWaterMark()和 Channel.config().setWriteLowWater- Mark()方法来设置
userEventTriggered 当 ChannelnboundHandler.fireUserEventTriggered()方法被调 用时被调用,因为一个 POJO 被传经了 ChannelPipeline
当某个 ChannelInboundHandler 的实现重写 channelRead()方法时,它将负责显式地 释放与池化的 ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法 ReferenceCount- Util.release(),如代码清单 6-1 所示。
/** * 代码清单 6-1 释放消息资源 **/ @Sharable // 扩展了 Channel-InboundHandler- Adapter public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 丢弃已接 收的消息 ReferenceCountUtil.release(msg); } }
Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用 SimpleChannelInboundHandler。代码清单 6-2 是代码清单 6-1 的一个变体,说明了这一点。
代码清单 6-2 使用 SimpleChannelInboundHandler
@Sharable public class SimpleDiscardHandler extends SimpleChannelInboundHandler
- 阻塞I/O -- socket
- 非阻塞I/O -- NIO
- Channel;
- Channel 是 Java NIO 的一个基本构造。
- 它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作 1。
- 目前,可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。
- 回调;
- Future;
- Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操 作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
- JDK 预置了 interface java.util.concurrent.Future,但是其所提供的实现,只 允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以 Netty 提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用。
- ChannelFuture提供了几种额外的方法,这些方法使得我们能够注册一个或者多个 ChannelFutureListener实例。监听器的回调方法operationComplete(),将会在对应的 操作完成时被调用 1。然后监听器可以判断该操作是成功地完成了还是出错了。如果是后者,我 们可以检索产生的Throwable。简而 言之 ,由ChannelFutureListener提供的通知机制消除 了手动检查对应的操作是否完成的必要。
- 每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture;也就是说,它们都不会阻塞。 正如我们前面所提到过的一样,Netty 完全是异步和事件驱动的。
- 如果你把 ChannelFutureListener 看作是回调的一个更加精细的版本,那么你是对的。 事实上,回调和 Future 是相互补充的机制;它们相互结合,构成了 Netty 本身的关键构件块之一。
- 事件和 ChannelHandler。
- Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经 发生的事件来触发适当的动作。这些动作可能是:
- 记录日志;
- 数据转换;
- 流控制;
- 应用程序逻辑。
- Netty 是一个网络编程框架,所以事件是按照它们与入站或出站数据流的相关性进行分类的。
- 连接已被激活或者连接失活;
- 数据读取;
- 用户事件;
- 错误事件。
- 出站事件是未来将会触发的某个动作的操作结果,这些动作包括:
- 打开或者关闭到远程节点的连接;
- 将数据写到或者冲刷到套接字。
- 每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法。这是一个很好的 将事件驱动范式直接转换为应用程序构件块的例子。图 1-3 展示了一个事件是如何被一个这样的 ChannelHandler 链处理的。
- Netty 的 ChannelHandler 为处理器提供了基本的抽象,如图 1-3 所示的那些。我们会 在适当的时候对 ChannelHandler 进行更多的说明,但是目前你可以认为每个 Channel- Handler 的实例都类似于一种为了响应特定事件而被执行的回调。
- Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议 (如 HTTP 和 SSL/TLS)的 ChannelHandler。在内部,ChannelHandler 自己也使用了事件
- Netty 的异步编程模型是建立在 Future 和回调的概念之上的,而将事件派发到 ChannelHandler 的方法则发生在更深的层次上。结合在一起,这些元素就提供了一个处理环境,使你的应用程序逻 辑可以独立于任何网络操作相关的顾虑而独立地演变。这也是 Netty 的设计方式的一个关键目标。
- 拦截操作以及高速地转换入站数据和出站数据,都只需要你提供回调或者利用操作所返回的 Future。这使得链接操作变得既简单又高效,并且促进了可重用的通用代码的编写。
- Netty 通过触发事件将 Selector 从应用程序中抽象出来,消除了所有本来将需要手动编写 的派发代码。在内部,将会为每个 Channel 分配一个 EventLoop,用以处理所有事件,包括:
- 注册感兴趣的事件;
- 将事件派发给 ChannelHandler;
- 安排进一步的动作。
- EventLoop 本身只由一个线程驱动,其处理了一个 Channel 的所有 I/O 事件,并且在该
- 这个简单而强大的设计消除了你可能有的在ChannelHandler 实现中需要进行同步的任何顾虑,
二、第一款Netty应用程序 1.编写 Echo 服务器
- 所有的 Netty 服务器都需要以下两部分。
- 至少一个ChannelHandler — 服务器对从客户端接收的数据的处理,即它的业务逻辑。
- 引导 — 这是配置服务器的启动代码。eg: 将服务器绑定到它要监听连接请求的端口上。
- 在第 1 章中,我们介绍了 Future 和回调,并且阐述了它们在事件驱动设计中的应用。我们 还讨论了 ChannelHandler,它是一个接口族的父接口,它的实现负责接收并响应事件通知。 在 Netty 应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。
- 因为你的 Echo 服务器会响应传入的消息,所以它需要实现 ChannelInboundHandler 接口,用 来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承 Channel- InboundHandlerAdapter 类也就足够了,它提供了 ChannelInboundHandler 的默认实现。
- 绑定到服务器端口, 监听并接受传入连接请求;
- 配置 Channel,以将有关的入站消息通知给 EchoServerHandler 实例。
- EchoServerHandler 实现了业务逻辑;
- main()方法引导了服务器;
- 创建一个 ServerBootstrap 的实例以引导和绑定服务器;
- 创建并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/写数据;
- 指定服务器绑定的本地的 InetSocketAddress;
- 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;
- 调用 ServerBootstrap.bind()方法以绑定服务器。
三、Netty3组件和设计 本章主要内容 ? Netty 的技术和体系结构方面的内容 ? Channel、EventLoop 和 ChannelFuture ? ChannelHandler 和 ChannelPipeline ? 引导
- 我们将从两个不同的但却又密切相 关的视角来探讨 Netty: 类库的视角以及框架的视角。对于使用 Netty 编写高效的、可重用的和 可维护的代码来说,两者缺一不可。
- 从高层次的角度来看,Netty 解决了两个相应的关注领域,我们可将其大致标记为技术的和 体系结构的。首先,它的基于 Java NIO 的异步的和事件驱动的实现,保证了高负载下应用程序 性能的最大化和可伸缩性。其次,Netty 也包含了一组设计模式,将应用程序逻辑从网络层解耦, 简化了开发过程,同时也最大限度地提高了可测试性、模块化以及代码的可重用性。
- 在我们更加详细地研究 Netty 的各个组件时,我们将密切关注它们是如何协作来支撑这 些体系结构上的最佳实践的。通过遵循同样的原则,我们便可获得 Netty 所提供的所有益处。
- Channel
- 基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提 供的原语。
- 在基于 Java 的网络编程中,其基本的构造是 class Socket。Netty 的 Channel 接 口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
- 此外,Channel 也是拥有许多 预定义的、专门化实现的广泛类层次结构的根
- EventLoop
- EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。
- ChannelFuture
- ChannelHandler 接口
- 从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。
- 因为 ChannelHandler 的方法是 由网络事件(其中术语“事件”的使用非常广泛)触发的。
- 事实上,ChannelHandler 可专 门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,或者处理转换过程 中所抛出的异常。
- 举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处 理。当你要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。你 的应用程序的业务逻辑通常驻留在一个或者多个 ChannelInboundHandler 中。
- ChannelPipeline 接口
- 使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初 始化或者引导阶段被安装的。
- 这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给 链中的下一个 ChannelHandler。
- 它们的执行顺序是由它们被添加的顺序所决定的。实际上, 被我们称为 ChannelPipeline 的是这些 ChannelHandler 的编排顺序。
- 图 3-3 说明了一个 Netty 应用程序中入站和出站数据流之间的区别。从一个客户端应用程序 的角度来看,如果事件的运动方向是从客户端到服务器端,那么我们称这些事件为出站的,反之 则称为入站的。
- 图 3-3 也显示了入站和出站 ChannelHandler 可以被安装到同一个 ChannelPipeline 中。
- 如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部 开始流动,并被传递给第一个 ChannelInboundHandler。这个 ChannelHandler 不一定 会实际地修改数据,具体取决于它的具体功能,在这之后,数据将会被传递给链中的下一个 ChannelInboundHandler。最终,数据将会到达 ChannelPipeline 的尾端,届时,所有 处理就都结束了。
- 数据的出站运动(即正在被写的数据)在概念上也是一样的。
- 你可以直接写到Channel中,
- 也可以 写到和Channel- Handler 相关联的 ChannelHandlerContext 对象中。
- Netty 的引导类为应用程序的网络层配置提供了容器,这涉及将一个进程绑定到某个指定的 端口,或者将一个进程连接到另一个运行在某个指定主机的指定端口上的进程。
- 通常来说,我们把前面的用例称作引导一个服务器,后面的用例称作引导一个客户端。虽然 这个术语简单方便,但是它略微掩盖了一个重要的事实,即“服务器”和“客户端”实际上表示 了不同的网络行为; 换句话说,是监听传入的连接还是建立到一个或者多个进程的连接。
- 因此,有两种类型的引导: 一种用于客户端(简单地称为 Bootstrap),而另一种 (ServerBootstrap)用于服务器。
- 第一组将只包含一个 ServerChannel,代表服务 器自身的已绑定到某个本地端口的正在监听的套接字。
- 而第二组将包含所有已创建的用来处理传 入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel。
- 图 3-4 说明了这个模 型,并且展示了为何需要两个不同的 EventLoopGroup。
四、传输 在本章中,我们将研究: 1.Netty传输、它们的实现和使用,以及 Netty 是如何将它们呈现给开发者的。 2.深入探讨了 Netty 预置的传输,并且解释了它们的行为。 3.如何匹配不同的传输和特定用例的需求。 本章主要内容 ? OIO——阻塞传输 ? NIO——异步传输 ? Local——JVM 内部的异步通信 ? Embedded——测试你的ChannelHandler 流经网络的数据总是具有相同的类型:字节。这些字节是如何流动的主要取决于我们所说的 网络传输— 一个帮助我们抽象底层数据传输机制的概念。用户并不关心这些细节;他们只想确 保他们的字节被可靠地发送和接收。 如果你有 Java 网络编程的经验,那么你可能已经发现,在某些时候,你需要支撑比预期多 很多的并发连接。如果你随后尝试从阻塞传输切换到非阻塞传输,那么你可能会因为这两种网络 API 的截然不同而遇到问题。 然而,Netty 为它所有的传输实现提供了一个通用 API,这使得这种转换比你直接使用 JDK 所能够达到的简单得多。所产生的代码不会被实现的细节所污染,而你也不需要在你的整个代码 库上进行广泛的重构。简而言之,你可以将时间花在其他更有成效的事情上。 4.1 案例研究:传输迁移 1.不通过 Netty 使用 OIO 和 NIO /** * 代码清单 4-1 未使用 Netty 的阻塞网络编程 * * @author xuxh * @date 2021/03/07 11:27 */ public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { for (; ; ) { final Socket clientSocket = socket.accept(); System.out.println( "Accepted connection from " + clientSocket); new Thread(new Runnable() { @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes( Charset.forName("UTF-8"))); out.flush(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); } } catch ( IOException e) { e.printStackTrace(); } } } /** * 代码清单 4-2 未使用 Netty 的异步网络编程 * * @author xuxh * @date 2021/03/07 11:32 */ public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ssocket = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ssocket.bind(address); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (; ; ) { try { // 等待需要处理的新事 件;阻塞 将一直持续到 下一个传入事件 selector.select(); } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } // 获取所有接 收事件的 Selection- Key 实例 Set
- ChannelConfig 包含了该 Channel 的所有配置设置,并且支持热更新。由于特定的传输可能具有独特的设置,所以它可能会实现一个 ChannelConfig 的子类型。(请参考 ChannelConfig 实现对应的 Javadoc。)
- ChannelPipeline 持有所有应用于入站数据和出站数据以及事件的 ChannelHandler 实 例,这些 ChannelHandler 实现了应用程序处理状态变化以及数据处理的逻辑。
- 由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang. Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那 么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。
5. ByteBuf 本章主要内容 本章专门探讨了 Netty 的基于 ByteBuf 的数据容器。我们讨论过的要点有: ??使用不同的读索引和写索引来控制数据访问; ??使用内存的不同方式——基于字节数组和直接缓冲区; ??通过 CompositeByteBuf 生成多个 ByteBuf 的聚合视图; ??数据访问方法——搜索、切片以及复制; ??读、写、获取和设置 API; ??内存分配:ByteBufAllocator 池化和引用计数。 网络数据的基本单位总是字节。Java NIO 提供了 ByteBuffer 作为它 的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。 Netty 的 ByteBuffer 替代品是 ByteBuf,一个强大的实现,既解决了 JDK API 的局限性, 又为网络应用程序的开发者提供了更好的 API。 5.1 ByteBuf 的 API Netty 的数据处理 API 通过两个组件暴露——abstract class ByteBuf 和 interface ByteBufHolder。 下面是一些 ByteBuf API 的优点: ??它可以被用户自定义的缓冲区类型扩展; ??通过内置的复合缓冲区类型实现了透明的零拷贝; ??容量可以按需增长(类似于 JDK 的 StringBuilder); ??在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法; ??读和写使用了不同的索引; ??支持方法的链式调用; ??支持引用计数; ??支持池化。 5.2 ByteBuf 类——Netty 的数据容器 5.2.1 它是如何工作的 ByteBuf 维护了两个不同的索引:一个用于读取,一个用于写入。当你从 ByteBuf 读取时, 它的 readerIndex 将会被递增已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的 writerIndex 也会被递增。图 5-1 展示了一个空 ByteBuf 的布局结构和状态。
- ByteBuf 的可读字节分段存储了实际数据。
- 新分配的、包装的或者复制的缓冲区的默认的 readerIndex 值为 0。
- 任何名称以 read 或者 skip 开头的操作都将检索或者跳过位于当前 readerIndex 的数据,并且将它增加已读字节数。
- 如果被调用的方法需要一个 ByteBuf 参数作为写入的目标,并且没有指定目标索引参数, 那么该目标缓冲区的 writerIndex 也将被增加,例如:
- 如果尝试在缓冲区的可读字节数已经耗尽时从中读取数据,那么将会引发一个 IndexOutOf- BoundsException。
- 可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。
- 新分配的缓冲区的 writerIndex 的默认值为 0。
- 任何名称以 write 开头的操作都将从当前的 writerIndex 处 开始写数据,并将它增加已经写入的字节数。
- 如果写操作的目标也是 ByteBuf,并且没有指定 源索引的值,则源缓冲区的 readerIndex 也同样会被增加相同的大小。这个调用如下所示:
- 如果尝试往目标写入超过目标容量的数据,将会引发一个IndexOutOfBoundException1。
第 6 章 ChannelHandler 和ChannelPipeline 本章中,我们将专注于 ChannelHandler,它为你的数据处理逻辑提供了载体。 因为ChannelHandler 大量地使用了 ByteBuf,你将开始看到 Netty 的整体架构的各个重要部分最 终走到了一起。 本章主要内容 ? ChannelHandler API 和 ChannelPipeline API ? 检测资源泄漏 ? 异常处理 6.1 ChannelHandler 家族 6.1.1 Channel 的生命周期 Interface Channel 定义了一组和 ChannelInboundHandler API 密切相关的简单但 功能强大的状态模型,表 6-1 列出了 Channel 的这 4 个状态。 表6-1 Channel的生命周期状态 状态 描述 ChannelUnregistered Channel 已经被创建,但还未注册到 EventLoop ChannelRegistered Channel 已经被注册到了 EventLoop ChannelActive Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 ChannelInactive Channel 没有连接到远程节点 Channel 的正常生命周期如图 6-1 所示。当这些状态发生改变时,将会生成对应的事件。 这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。