Netty 是一个异步的、基于事件驱动的高性能网络应用框架,本文档维护 Netty 的相关内容。

一些概念

Netty 的基础构件块:Channel、回调、Future、事件及 ChannelHandler

  • Channel :数据的通道,一个到实体的开放连接
  • Future :一种在操作完成时通知应用程序的方式。
    • ChannelFuture用于在执行异步操作的时候使用,可以注册一个或多个监听器,提供回调方法,这些方法将会在之后某个时间点执行
  • 事件:用来通知状态的改变或者是操作的状态
    • 入站相关
      • 连接已被激活或者连接失活
      • 数据读取
      • 用户事件
      • 错误事件
    • 出站相关
      • 打开或关闭到远程节点的连接
      • 将数据写到或冲刷到套接字
    • 每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法

Netty 通过触发事件将 Selector 从应用程序中抽象出来。在内部,将会为每个 Channel分配一个 EventLoop,用以处理所有事件

  • msg : 流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,后输出又变成 ByteBuf
  • handler :数据的处理工序
    • 多个工序合在一起就是 pipeline,pipeline 负责将事件传播给每个 handler, handler 对自己感兴趣的事件进行处理
    • handler 分 Inbound(入站)和 Outbound(出站) 两类
  • eventLoop :处理数据的工人
    • 工人可以管理多个 channel 的 IO 操作,工人和 channel 是绑定在一起的
    • 工人既可以执行 IO 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • 工人按照 pipeline 顺序,依次按照 handler 的规划处理数据,可以为每道工序指定不同的工人

基础示例

Server 端

要实现一个服务器,主要有以下步骤:

  • 创建一个 ServerBootstrap 的实例以引导和绑定服务器
  • 创建并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/ 写数据
  • 指定服务器绑定的本地的 InetSocketAddress
  • 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel
  • 调用 ServerBootstrap.bind() 方法以绑定服务器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void start() throws InterruptedException {  
final EchoServerHandler serverHandler = new EchoServerHandler();
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture future = serverBootstrap.bind().sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
group.shutdownGracefully().sync();
}

Client 端

客户端实现步骤:

  • 创建一个 Bootstrap 实例
  • 为进行事件处理分配了一个 NioEventLoopGroup 实例,其中事件处理包括创建新的连接以及处理入站和出站数据
  • 为服务器连接创建了一个 InetSocketAddress 实例
  • 当连接被建立时,一个 EchoClientHandler 实例会被安装到该 ChannelChannelPipeline
  • 在一切都设置完成后,调用 Bootstrap.connect() 方法连接到远程节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void start() throws InterruptedException {  
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = bootstrap.connect().sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
group.shutdownGracefully().sync();
}
}

组件与设计

Channel、EventLoop 和 ChannelFuture

  • Channel – Socket

Channel 的生命周期状态如下表:

状态 描述
ChannelUnregistered Channel 已经被创建,但还未注册到 EventLoop
ChannelRegistered Channel 已经被注册到了 EventLoop
ChannelActive Channel 处于活动状态,可以接收和发送数据
ChannelInactive Channel 没有连接到远程节点
  • EventLoop – 控制流、多线程、并发
  • ChannelFuture – 回调, 异步通知

image.png

  • 一个 EventLoopGroup 包含一个或者多个 EventLoop
  • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定
  • 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理
  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop
  • 一个 EventLoop 可能会被分配给一个或多个 Channel

在 Netty 的设计中,一个 Channel 的所有操作都由相同的 Thread 执行,消除了同步的需要。是线程安全的。

Netty 提供了 ChannelFuture 接口,其 addListener() 方法注册了一个 ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知

ChannelHandler 和 ChannelPipeline

ChannelHandler 是 Netty 中最主要的组件,用于处理入站和出站数据,由网络事件触发。常用的是 ChannelInboundHandler 接口。

ChannelPipelineChannelHandler 的实例链。每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。某个 ChannelHandler 在处理完某个事件后,会将数据按顺序传递给链中的下一个 ChannelHandler

出站和入站?
客户端的角度来讲,事件运动方向从客户端到服务器端,则为出站,反之为入站。

ChannelHandler 被添加到 ChannelPipeline 时 ,它将会被分配一个 ChannelHandlerContext,其代表了 ChannelHandlerChannelPipeline 之间的绑定。这个对象主要被用于写出站数据,是不同 ChannelHandler 交互的媒介。

  • ChannelPipeline 可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改
  • ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件。

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 ChannelChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。

调用位于 ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler

如果需要在多个 ChannelPipeline 上共享同一个 ChannelHandler,需要加上 @Sharable 注解,常用于需要跨越多个 Channel 收集信息的场景,注意共享 ChannelHandler 需要处理同步问题。

ByteBuf

ByteBuf 是 Netty 的数据处理容器

优点:

  • 可以被用户自定义的缓冲区类型扩展
  • 通过内置的复合缓冲区类型实现了透明的零拷贝
  • 容量可以按需增长
  • 在读和写这两种模式之间切换不需要调用 ByteBufferflip() 方法
  • 读和写使用了不同的索引
  • 支持方法的链式调用
  • 支持引用计数
  • 支持池化

工作原理

ByteBuf 维护两个索引,一个用于读取 readerIndex,一个用于写入 writerIndex。如果两个索引值相同,说明达到了“可读数据”的末尾。

名称以 read 或者 write 开头的 ByteBuf 方法,将会推进其对应的索引,而名称以 set 或者 get 开头的操作则不会。后面的这些方法将在作为一个参数传入的一个相对索引上执行操作。

使用模式

1. 堆缓冲区

将数据存储在 JVM 的堆中,能在没有使用池化的情况下提供快速的分配和释放。

2. 直接缓冲区

直接缓冲区的内容在堆外内存,对于网络数据传输是理想的选择。如果需要处理这部分的数据,需要将其复制到工作内存中。

3. 复合缓冲区

为多个 ByteBuf 提供一个聚合视图。CompositeByteBuf 实现该模式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。

分配 ByteBuf

1. 按需分配:ByteBufAllocator 接口

ByteBufAllocator 实现了 ByteBuf 的池化。

image.png

Netty 提供了两种 ByteBufAllocator 的实现:PooledByteBufAllocatorUnpooledByteBufAllocator前者池化了 ByteBuf 的实例以提高性能并最大限度地减少内存碎片。后者的实现不池化 ByteBuf 实例,并且在每次它被调用时都会返回一个新的实例

2. Unpooled 缓冲区

Unpooled 工具类提供了静态的辅助方法来创建未池化的 ByteBuf 实例。

image.png

3. ByteBufUtil 类

ByteBufUtil 提供了用于操作ByteBuf的静态的辅助方法。

异常处理

入站异常处理

每个 Channel 都拥有一个与之相关联的 ChannelPipeline,其持有一个 ChannelHandler 的实例链。在默认的情况下,ChannelHandler 会把对它的方法的调用转发给链中的下一个 ChannelHandler。因此,如果 exceptionCaught() 方法没有被该链中的某处实现,那么所接收的异常将会被传递到 ChannelPipeline 的尾端并被记录。为此,你的应用程序应该提供至少有一个实现了 exceptionCaught() 方法的ChannelHandler

  • ChannelHandler.exceptionCaught() 默认将当前异常转发给 ChannelPipeline 中的下一个 ChannelHandler
  • 如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理
  • 可以重写 exceptionCaught() 方法来自定义处理逻辑

出站异常处理

  • 每个出站操作都将返回一个 ChannelFuture。注册到 ChannelFutureChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了
  • 几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise 的实例。作为 ChannelFuture 的子类,ChannelPromise 也可以被分配用于异步通知的监听器。
    • ChannelPromise 还具有提供立即通知的可写方法
      • ChannelPromise setSuccess();
      • ChannelPromise setFailure(Throwable cause);

EventLoop 和线程模型

1. 异步传输

image.png

EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中, 使用轮询的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel

一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个 EventLoop(以及相关联的Thread)。

另外,需要注意的是,EventLoop 的分配方式对 ThreadLocal 的使用的影响。因为一个 EventLoop 通常会被用于支撑多个 Channel,所以对于所有相关联的 Channel 来说, ThreadLocal 都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而, 在一些无状态的上下文中,它仍然可以被用于在多个 Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。

2. 阻塞传输

image.png

每一个 Channel 都将被分配给一个 EventLoop(以及它的 Thread)。每个 Channel 的I/O 事件都将只会被一个 Thread (用于支撑该 ChannelEventLoop 的那个 Thread)处理。

消息的资源管理

write():不会释放消息

writeAndFlash():消息会在该方法被调用时释放