Netty 是一个异步的、基于事件驱动的高性能网络应用框架,本文档维护 Netty 的相关内容。
一些概念
Netty 的基础构件块:Channel
、回调、Future
、事件及 ChannelHandler
Channel
:数据的通道,一个到实体的开放连接Future
:一种在操作完成时通知应用程序的方式。ChannelFuture
用于在执行异步操作的时候使用,可以注册一个或多个监听器,提供回调方法,这些方法将会在之后某个时间点执行
- 事件:用来通知状态的改变或者是操作的状态
- 入站相关
- 连接已被激活或者连接失活
- 数据读取
- 用户事件
- 错误事件
- 出站相关
- 打开或关闭到远程节点的连接
- 将数据写到或冲刷到套接字
- 每个事件都可以被分发给
ChannelHandler
类中的某个用户实现的方法
- 入站相关
Netty 通过触发事件将 Selector 从应用程序中抽象出来。在内部,将会为每个 Channel分配一个 EventLoop,用以处理所有事件
msg
: 流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,后输出又变成 ByteBufhandler
:数据的处理工序- 多个工序合在一起就是 pipeline,pipeline 负责将事件传播给每个 handler, handler 对自己感兴趣的事件进行处理
- handler 分 Inbound(入站)和 Outbound(出站) 两类
eventLoop
:处理数据的工人- 工人可以管理多个 channel 的 IO 操作,工人和 channel 是绑定在一起的
- 工人既可以执行 IO 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- 工人按照 pipeline 顺序,依次按照 handler 的规划处理数据,可以为每道工序指定不同的工人
基础示例
Server 端
要实现一个服务器,主要有以下步骤:
- 创建一个
ServerBootstrap
的实例以引导和绑定服务器 - 创建并分配一个
NioEventLoopGroup
实例以进行事件的处理,如接受新连接以及读/ 写数据 - 指定服务器绑定的本地的
InetSocketAddress
- 使用一个
EchoServerHandler
的实例初始化每一个新的Channel
- 调用
ServerBootstrap.bind()
方法以绑定服务器。
1 | private void start() throws InterruptedException { |
Client 端
客户端实现步骤:
- 创建一个
Bootstrap
实例 - 为进行事件处理分配了一个
NioEventLoopGroup
实例,其中事件处理包括创建新的连接以及处理入站和出站数据 - 为服务器连接创建了一个
InetSocketAddress
实例 - 当连接被建立时,一个
EchoClientHandler
实例会被安装到该Channel
的ChannelPipeline
中 - 在一切都设置完成后,调用
Bootstrap.connect()
方法连接到远程节点
1 | private void start() throws InterruptedException { |
组件与设计
Channel、EventLoop 和 ChannelFuture
- Channel – Socket
Channel 的生命周期状态如下表:
状态 | 描述 |
---|---|
ChannelUnregistered |
Channel 已经被创建,但还未注册到 EventLoop |
ChannelRegistered |
Channel 已经被注册到了 EventLoop |
ChannelActive |
Channel 处于活动状态,可以接收和发送数据 |
ChannelInactive |
Channel 没有连接到远程节点 |
- EventLoop – 控制流、多线程、并发
- ChannelFuture – 回调, 异步通知
- 一个 EventLoopGroup 包含一个或者多个 EventLoop
- 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定
- 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理
- 一个 Channel 在它的生命周期内只注册于一个 EventLoop
- 一个 EventLoop 可能会被分配给一个或多个 Channel
在 Netty 的设计中,一个 Channel 的所有操作都由相同的 Thread 执行,消除了同步的需要。是线程安全的。
Netty 提供了 ChannelFuture
接口,其 addListener()
方法注册了一个 ChannelFutureListener
,以便在某个操作完成时(无论是否成功)得到通知
ChannelHandler 和 ChannelPipeline
ChannelHandler
是 Netty 中最主要的组件,用于处理入站和出站数据,由网络事件触发。常用的是 ChannelInboundHandler
接口。
ChannelPipeline
是 ChannelHandler
的实例链。每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。某个 ChannelHandler
在处理完某个事件后,会将数据按顺序传递给链中的下一个 ChannelHandler
。
出站和入站?
从客户端的角度来讲,事件运动方向从客户端到服务器端,则为出站,反之为入站。
当 ChannelHandler
被添加到 ChannelPipeline
时 ,它将会被分配一个 ChannelHandlerContext
,其代表了 ChannelHandler
和 ChannelPipeline
之间的绑定。这个对象主要被用于写出站数据,是不同 ChannelHandler
交互的媒介。
- ChannelPipeline 可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改
- ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件。
ChannelHandlerContext
有很多的方法,其中一些方法也存在于 Channel
和 ChannelPipeline
本身上,但是有一点重要的不同。如果调用 Channel
或者 ChannelPipeline
上的这些方法,它们将沿着整个 ChannelPipeline
进行传播。
调用位于 ChannelHandlerContext
上的相同方法,则将从当前所关联的 ChannelHandler
开始,并且只会传播给位于该 ChannelPipeline
中的下一个能够处理该事件的 ChannelHandler
。
如果需要在多个 ChannelPipeline
上共享同一个 ChannelHandler
,需要加上 @Sharable
注解,常用于需要跨越多个 Channel
收集信息的场景,注意共享 ChannelHandler
需要处理同步问题。
ByteBuf
ByteBuf 是 Netty 的数据处理容器
优点:
- 可以被用户自定义的缓冲区类型扩展
- 通过内置的复合缓冲区类型实现了透明的零拷贝
- 容量可以按需增长
- 在读和写这两种模式之间切换不需要调用
ByteBuffer
的flip()
方法 - 读和写使用了不同的索引
- 支持方法的链式调用
- 支持引用计数
- 支持池化
工作原理
ByteBuf 维护两个索引,一个用于读取 readerIndex
,一个用于写入 writerIndex
。如果两个索引值相同,说明达到了“可读数据”的末尾。
名称以 read
或者 write
开头的 ByteBuf 方法,将会推进其对应的索引,而名称以 set
或者 get
开头的操作则不会。后面的这些方法将在作为一个参数传入的一个相对索引上执行操作。
使用模式
1. 堆缓冲区
将数据存储在 JVM 的堆中,能在没有使用池化的情况下提供快速的分配和释放。
2. 直接缓冲区
直接缓冲区的内容在堆外内存,对于网络数据传输是理想的选择。如果需要处理这部分的数据,需要将其复制到工作内存中。
3. 复合缓冲区
为多个 ByteBuf 提供一个聚合视图。CompositeByteBuf
实现该模式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。
分配 ByteBuf
1. 按需分配:ByteBufAllocator 接口
ByteBufAllocator
实现了 ByteBuf 的池化。
Netty 提供了两种 ByteBufAllocator
的实现:PooledByteBufAllocator
和 UnpooledByteBufAllocator
。前者池化了 ByteBuf 的实例以提高性能并最大限度地减少内存碎片。后者的实现不池化 ByteBuf 实例,并且在每次它被调用时都会返回一个新的实例。
2. Unpooled 缓冲区
Unpooled 工具类提供了静态的辅助方法来创建未池化的 ByteBuf 实例。
3. ByteBufUtil 类
ByteBufUtil 提供了用于操作ByteBuf的静态的辅助方法。
异常处理
入站异常处理
每个 Channel
都拥有一个与之相关联的 ChannelPipeline
,其持有一个 ChannelHandler
的实例链。在默认的情况下,ChannelHandler
会把对它的方法的调用转发给链中的下一个 ChannelHandler
。因此,如果 exceptionCaught()
方法没有被该链中的某处实现,那么所接收的异常将会被传递到 ChannelPipeline
的尾端并被记录。为此,你的应用程序应该提供至少有一个实现了 exceptionCaught()
方法的ChannelHandler
。
ChannelHandler.exceptionCaught()
默认将当前异常转发给ChannelPipeline
中的下一个ChannelHandler
- 如果异常到达了
ChannelPipeline
的尾端,它将会被记录为未被处理 - 可以重写
exceptionCaught()
方法来自定义处理逻辑
出站异常处理
- 每个出站操作都将返回一个
ChannelFuture
。注册到ChannelFuture
的ChannelFutureListener
将在操作完成时被通知该操作是成功了还是出错了 - 几乎所有的
ChannelOutboundHandler
上的方法都会传入一个ChannelPromise
的实例。作为ChannelFuture
的子类,ChannelPromise
也可以被分配用于异步通知的监听器。ChannelPromise
还具有提供立即通知的可写方法ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);
EventLoop 和线程模型
1. 异步传输
EventLoopGroup
负责为每个新创建的 Channel
分配一个 EventLoop
。在当前实现中, 使用轮询的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop
可能会被分配给多个 Channel
。
一旦一个 Channel
被分配给一个 EventLoop
,它将在它的整个生命周期中都使用这个 EventLoop
(以及相关联的Thread)。
另外,需要注意的是,EventLoop
的分配方式对 ThreadLocal
的使用的影响。因为一个 EventLoop
通常会被用于支撑多个 Channel
,所以对于所有相关联的 Channel
来说, ThreadLocal
都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而, 在一些无状态的上下文中,它仍然可以被用于在多个 Channel
之间共享一些重度的或者代价昂贵的对象,甚至是事件。
2. 阻塞传输
每一个 Channel
都将被分配给一个 EventLoop
(以及它的 Thread
)。每个 Channel
的I/O 事件都将只会被一个 Thread
(用于支撑该 Channel
的 EventLoop
的那个 Thread
)处理。
消息的资源管理
write()
:不会释放消息
writeAndFlash()
:消息会在该方法被调用时释放