NETTY组件-CHANNEL
Channel接口
Netty中的Channel与Java NIO的概念一样,都是对一个实体或连接的抽象,但Netty提供了一套更加通用的API。就以网络套接字为例,在Java中OIO与NIO是截然不同的两套API,假设你之前使用的是OIO而又想更改为NIO实现,那么几乎需要重写所有代码。而在Netty中,只需要更改短短几行代码(更改Channel与EventLoop的实现类,如把OioServerSocketChannel替换为NioServerSocketChannel),就可以完成OIO与NIO(或其他)之间的转换。
每个Channel最终都会被分配一个ChannelPipeline和ChannelConfig,前者持有所有负责处理入站与出站数据以及事件的ChannelHandler,后者包含了该Channel的所有配置设置,并且支持热更新,由于不同的传输类型可能具有其特别的配置,所以该类可能会实现为ChannelConfig的不同子类。
常用实现
Channel的产生是为了降低网络传输变成的复杂性,它是传入传出数据的载体,可以打开或者关闭,连接或断开。可以当做它是Socket的升级,大大降低了直接与 Socket 进行操作的复杂性。
Nmae | Package | Description |
---|---|---|
NIO | io.netty.channel.socket.nio | 以Java NIO为基础实现 |
OIO | io.netty.channel.socket.oio | 以java.net为基础实现,使用阻塞I/O模型 |
Epoll | io.netty.channel.epoll | 由JNI驱动epoll()实现的更高性能的非阻塞I/O,它只能使用在Linux |
Local | io.netty.channel.local | 本地传输,在JVM内部通过管道进行通信 |
Embedded | io.netty.channel.embedded | 允许在不需要真实网络传输的环境下使用ChannelHandler,主要用于对ChannelHandler进行测试 |
选择合适的内置通信传输模式
NIO
io.netty.channel.socket.nio 使用java.nio.channels 包作为基础——基于选择器的方式
Epoll
io.netty.channel.epoll 由 JNI 驱动的 epoll()和非阻塞 IO。这个传输支持只有在Linux 上可用的多种特性,如SO_REUSEPORT,比NIO 传输更快,而且是完全非阻塞的。将NioEventLoopGroup替换为EpollEventLoopGroup , 并且将NioServerSocketChannel.class 替换为EpollServerSocketChannel.class 即可。
OIO
io.netty.channel.socket.oio 使用java.net 包作为基础——使用阻塞流
Local
io.netty.channel.local 可以在VM 内部通过管道进行通信的本地传输Local传输用于在同一个JVM中运行的客户端和服务器程序之间的异步通信,与服务器Channel相关联的SocketAddress并没有绑定真正的物理网络地址,它会被存储在注册表中,并在Channel关闭时注销。因此Local传输不会接受真正的网络流量,也就是说它不能与其他传输实现进行互操作。
Embedded
io.netty.channel.embedded Embedded 传输,允许使用ChannelHandler 而又不需要一个真正的基于网络的传输。在测试ChannelHandler 实现时非常有用
Embedded传输主要用于对ChannelHandler进行单元测试,ChannelHandler是用于处理消息的逻辑组件,Netty通过将入站消息与出站消息都写入到EmbeddedChannel中的方式(提供了write/readInbound()与write/readOutbound()来读写入站与出站消息)来实现对ChannelHandler的单元测试。
常用操作
基本的I/O 操作(bind()、connect()、read()和write())依赖于底层网络传输所提供的原语。在基于Java 的网络编程中,其基本的构造是类Socket。Netty 的Channel 接口所提供的API,被用于所有的I/O 操作。大大地降低了直接使用Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。
由于Channel 是独一无二的,所以为了保证顺序将Channel 声明为java.lang.Comparable 的一个子接口。因此,如果两个不同的Channel 实例都返回了相同的散列码,那么AbstractChannel 中的compareTo()方法的实现将会抛出一个Error。
生命周期状态
ChannelUnregistered :Channel 已经被创建,但还未注册到EventLoop
ChannelRegistered :Channel 已经被注册到了EventLoop
ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
ChannelInactive :Channel 没有连接到远程节点
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline 中的ChannelHandler,其可以随后对它们做出响应。
最重要的方法
eventLoop: 返回分配给Channel 的EventLoop
pipeline: 返回分配给Channel 的ChannelPipeline
isActive: 如果Channel 是活动的,则返回true。活动的意义可能依赖于底层的传输。例如,一个Socket 传输一旦连接到了远程节点便是活动的,而一个Datagram 传输一旦被打开便是活动的。
localAddress: 返回本地的SokcetAddress
remoteAddress: 返回远程的SocketAddress
write: 将数据写到远程节点。这个数据将被传递给ChannelPipeline,并且排队直到它被冲刷
flush: 将之前已写的数据冲刷到底层传输,如一个Socket
writeAndFlush: 一个简便的方法,等同于调用write()并接着调用flush()
ChannelFuture 接口
Netty 中所有的I/O 操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了ChannelFuture 接口,其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
- 可以将ChannelFuture看作是将来要执行的操作的结果占位符,什么时候被执行,不知道。但肯定会被执行
- 属于同一个Channel的操作(回调函数)都被保证将按照注册的顺序执行。
比如有直接阻塞获取的方式:
1 | /*绑定到端口,阻塞等待直到连接完成*/ |
异步监听的方式
1 | serverBootstrap.bind(port).addListener(future -> { |
ChannelHandler接口
从应用程序开发人员的角度来看,Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 的方法是由网络事件触发的。事实上,ChannelHandler 可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,例如各种编解码,或者处理转换过程中所抛出的异常。
举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler 直接冲刷数据然后输出到对端。应用程序的业务逻辑通常实现在一个或者多个ChannelInboundHandler 中。
这种类型的ChannelHandler 接收入站事件和数据,这些数据随后将会被应用程序的业务逻辑所处理。
ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器,该类是基于事件驱动的,它会响应相关的事件然后去调用其关联的回调函数,例如当一个新的连接被建立时,ChannelHandler的channelActive()方法将会被调用。
关于入站消息和出站消息的数据流向定义,如果以客户端为主视角来说的话,那么从客户端流向服务器的数据被称为出站,反之为入站。
入站事件是可能被入站数据或者相关的状态更改而触发的事件,包括:连接已被激活、连接失活、读取入站数据、用户事件、发生异常等。
出站事件是未来将会触发的某个动作的结果的事件,这些动作包括:打开或关闭远程节点的连接、将数据写(或冲刷)到套接字。
ChannelHandler的作用
ChannelHandler的主要用途包括:
- 对入站与出站数据的业务逻辑处理
- 记录日志
- 将数据从一种格式转换为另一种格式,实现编解码器。以一次HTTP协议(或者其他应用层协议)的流程为例,数据在网络传输时的单位为字节,当客户端发送请求到服务器时,服务器需要通过解码器(处理入站消息)将字节解码为协议的消息内容,服务器在发送响应的时候(处理出站消息),还需要通过编码器将消息内容编码为字节。
- 捕获异常
- 提供Channel生命周期内的通知,如Channel活动时与非活动时
Netty中到处都充满了异步与事件驱动,而回调函数正是用于响应事件之后的操作。由于异步会直接返回一个结果,所以Netty提供了ChannelFuture(实现了java.util.concurrent.Future)来作为异步调用返回的占位符,真正的结果会在未来的某个时刻完成,到时候就可以通过ChannelFuture对其进行访问,每个Netty的出站I/O操作都将会返回一个ChannelFuture。
ChannelFutureListener接口
Netty还提供了ChannelFutureListener接口来监听ChannelFuture是否成功,并采取对应的操作。
1 | Channel channel = ... |
ChannelFutureListener实现
ChannelFutureListener接口中还提供了几个简单的默认实现,方便我们使用。
1 | public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> { |
ChannelHandler接口定义了对它生命周期进行监听的回调函数,在ChannelHandler被添加到ChannelPipeline或者被移除时都会调用这些函数。
1 | public interface ChannelHandler { |
入站消息与出站消息由其对应的接口ChannelInboundHandler与ChannelOutboundHandler负责,这两个接口定义了监听Channel的生命周期的状态改变事件的回调函数。
1 | public interface ChannelInboundHandler extends ChannelHandler { |
通过实现ChannelInboundHandler或者ChannelOutboundHandler就可以完成用户自定义的应用逻辑处理程序,不过Netty已经帮你实现了一些基本操作,用户只需要继承并扩展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter来作为自定义实现的起始点。
ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter都继承于ChannelHandlerAdapter,该抽象类简单实现了ChannelHandler接口。
1 | public ChannelHandlerAdapter(){ |
ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter默认只是简单地将请求传递给ChannelPipeline中的下一个ChannelHandler,源码如下:
1 | public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { |
对于处理入站消息,另外一种选择是继承SimpleChannelInboundHandler,它是Netty的一个继承于ChannelInboundHandlerAdapter的抽象类,并在其之上实现了自动释放资源的功能。
我们在了解ByteBuf时就已经知道了Netty使用了一套自己实现的引用计数算法来主动释放资源,假设你的ChannelHandler继承于ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter,那么你就有责任去管理你所分配的ByteBuf,一般来说,一个消息对象(ByteBuf)已经被消费(或丢弃)了,并且不会传递给ChannelHandler链中的下一个处理器(如果该消息到达了实际的传输层,那么当它被写入或Channel关闭时,都会被自动释放),那么你就需要去手动释放它。通过一个简单的工具类ReferenceCountUtil的release方法,就可以做到这一点。
1 | // 这个泛型为消息对象的类型 |
ChannelHandler 的生命周期
接口 ChannelHandler 定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline 中或者被从ChannelPipeline 中移除时会调用这些操作。这些方法中的每一个都接受一个ChannelHandlerContext 参数。
handlerAdded :当把ChannelHandler 添加到ChannelPipeline 中时被调用
handlerRemoved : 当从ChannelPipeline 中移除ChannelHandler 时被调用
exceptionCaught : 当处理过程中在ChannelPipeline 中有错误产生时被调用
Netty 定义了下面两个重要的ChannelHandler 子接口:
ChannelInboundHandler——处理入站数据以及各种状态变化;
ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。
ChannelInboundHandler
下面列出了接口 ChannelInboundHandler 的生命周期方法。这些方法将会在数据被接收时或者与其对应的Channel 状态发生改变时被调用。正如我们前面所提到的,这些方法和Channel 的生命周期密切相关。
channelRegistered
当Channel 已经注册到它的EventLoop 并且能够处理I/O 时被调用
channelUnregistered
当Channel 从它的EventLoop 注销并且无法处理任何I/O 时被调用
channelActive
当Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
channelInactive
当Channel 离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete
当Channel上的一个读操作完成时被调用
channelRead
当从Channel 读取数据时被调用
ChannelWritabilityChanged
当Channel 的可写状态发生改变时被调用。可以通过调用Channel 的isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置
userEventTriggered
当ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用。
ChannelOutboundHandler接口
出站操作和数据将由ChannelOutboundHandler 处理。它的方法将被Channel、Channel-
Pipeline 以及ChannelHandlerContext 调用。
所有由ChannelOutboundHandler 本身所定义的方法:
bind(ChannelHandlerContext,SocketAddress,ChannelPromise)
当请求将Channel 绑定到本地地址时被调用
connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)
当请求将Channel 连接到远程节点时被调用
disconnect(ChannelHandlerContext,ChannelPromise)
当请求将Channel 从远程节点断开时被调用
close(ChannelHandlerContext,ChannelPromise)
当请求关闭Channel 时被调用
deregister(ChannelHandlerContext,ChannelPromise)
当请求将Channel 从它的EventLoop 注销时被调用
read(ChannelHandlerContext)
当请求从Channel 读取更多的数据时被调用
flush(ChannelHandlerContext)
当请求通过Channel 将入队数据冲刷到远程节点时被调用
write(ChannelHandlerContext,Object,ChannelPromise)
当请求通过Channel 将数据写到远程节点时被调用
ChannelHandler的适配器
有一些适配器类可以将编写自定义的ChannelHandler 所需要的工作降到最低限度,因为它们提供了定义在对应接口中的所有方法的默认实现。因为你有时会忽略那些不感兴趣的事件,所以Netty提供了抽象基类ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter。
你可以使用ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter类作为自己的ChannelHandler 的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler 的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler 的方法。
ChannelHandlerAdapter 还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline。
ChannelPipeline 接口
为了模块化与解耦合,不可能由一个ChannelHandler来完成所有应用逻辑,所以Netty采用了拦截器链的设计。ChannelPipeline就是用来管理ChannelHandler实例链的容器,它的职责就是保证实例链的流动。
每一个新创建的Channel都将会被分配一个新的ChannelPipeline,这种关联关系是永久性的,一个Channel一生只能对应一个ChannelPipeline。
当Channel 被创建时,它将会被自动地分配一个新的ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个ChannelPipeline,也不能分离其当前的。在Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
使得事件流经ChannelPipeline 是ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个ChannelHandler。它们的执行顺序是由它们被添加的顺序所决定的。
入站和出站ChannelHandler 可以被安装到同一个ChannelPipeline中。如果一个消息或者任何其他的入站事件被读取,那么它会从ChannelPipeline 的头部开始流动,最终,数据将会到达ChannelPipeline 的尾端,届时,所有处理就都结束了。
数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从ChannelOutboundHandler 链的尾端开始流动,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层,这里显示为Socket。通常情况下,这将触发一个写操作。
如果将两个类别的ChannelHandler都混合添加到同一个ChannelPipeline 中会发生什么。虽然ChannelInboundHandle 和ChannelOutboundHandle 都扩展自ChannelHandler,但是Netty 能区分ChannelInboundHandler实现和ChannelOutboundHandler 实现,并确保数据只会在具有相同定向类型的两个ChannelHandler 之间传递。
一个入站事件被触发时,它会先从ChannelPipeline的最左端(头部)开始一直传播到ChannelPipeline的最右端(尾部),而出站事件正好与入站事件顺序相反(从最右端一直传播到最左端)。这个顺序是定死的,Netty总是将ChannelPipeline的入站口作为头部,而将出站口作为尾部。在事件传播的过程中,ChannelPipeline会判断下一个ChannelHandler的类型是否和事件的运动方向相匹配,如果不匹配,就跳过该ChannelHandler并继续检查下一个(保证入站事件只会被ChannelInboundHandler处理),一个ChannelHandler也可以同时实现ChannelInboundHandler与ChannelOutboundHandler,它在入站事件与出站事件中都会被调用。
在阅读ChannelHandler的源码时,发现很多方法需要一个ChannelHandlerContext类型的参数,该接口是ChannelPipeline与ChannelHandler之间相关联的关键。ChannelHandlerContext可以通知ChannelPipeline中的当前ChannelHandler的下一个ChannelHandler,还可以动态地改变当前ChannelHandler在ChannelPipeline中的位置(通过调用ChannelPipeline中的各种方法来修改)。
ChannelHandlerContext负责了在同一个ChannelPipeline中的ChannelHandler与其他ChannelHandler之间的交互,每个ChannelHandlerContext都对应了一个ChannelHandler。在DefaultChannelPipeline的源码中,已经表现的很明显了。
1 | public class DefaultChannelPipeline implements ChannelPipeline { |
ChannelHandlerContext还定义了许多与Channel和ChannelPipeline重合的方法(像read()、write()、connect()这些用于出站的方法或者如fireChannelXXXX()这样用于入站的方法),不同之处在于调用Channel或者ChannelPipeline上的这些方法,它们将会从头沿着整个ChannelHandler实例链进行传播,而调用位于ChannelHandlerContext上的相同方法,则会从当前所关联的ChannelHandler开始,且只会传播给实例链中的下一个ChannelHandler。而且,事件之间的移动(从一个ChannelHandler到下一个ChannelHandler)也是通过ChannelHandlerContext中的方法调用完成的。
1 | public class DefaultChannelPipeline implements ChannelPipeline { |
ChannelPipeline上的方法
addFirst、addBefore、addAfter、addLast
将一个ChannelHandler 添加到ChannelPipeline 中
remove
将一个ChannelHandler 从ChannelPipeline 中移除
replace
将ChannelPipeline 中的一个ChannelHandler 替换为另一个ChannelHandler
get
通过类型或者名称返回ChannelHandler
context
返回和ChannelHandler 绑定的ChannelHandlerContext
names
返回ChannelPipeline 中所有ChannelHandler 的名称
ChannelPipeline 的API 公开了用于调用入站和出站操作的附加方法。
ChannelHandlerContext
通过使用作为参数传递到每个方法的ChannelHandlerContext,事件可以被传递给当前ChannelHandler 链中的下一个ChannelHandler。虽然这个对象可以被用于获取底层的Channel,但是它主要还是被用于写出站数据。
ChannelHandlerContext 代表了ChannelHandler 和ChannelPipeline 之间的关联,每当有ChannelHandler 添加到ChannelPipeline 中时,都会创建ChannelHandler-Context。ChannelHandlerContext 的主要功能是管理它所关联的ChannelHandler 和在同一个ChannelPipeline 中的其他ChannelHandler 之间的交互。
ChannelHandlerContext 有很多的方法,其中一些方法也存在于Channel 和Channel-Pipeline 本身上,但是有一点重要的不同。如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个(入站下一个,出站上一个)能够处理该事件的ChannelHandler。
ChannelHandlerContext的API
alloc
返回和这个实例相关联的Channel 所配置的ByteBufAllocator
bind
绑定到给定的SocketAddress,并返回ChannelFuture
channel
返回绑定到这个实例的Channel
close
关闭Channel,并返回ChannelFuture
connect
连接给定的SocketAddress,并返回ChannelFuture
deregister
从之前分配的EventExecutor 注销,并返回ChannelFuture
disconnect
从远程节点断开,并返回ChannelFuture
executor
返回调度事件的EventExecutor
fireChannelActive
触发对下一个ChannelInboundHandler 上的channelActive()方法(已连接)的调用
fireChannelInactive
触发对下一个ChannelInboundHandler 上的channelInactive()方法(已关闭)的调用
fireChannelRead
触发对下一个ChannelInboundHandler 上的channelRead()方法(已接收的消息)的调用
fireChannelReadComplete
触发对下一个ChannelInboundHandler 上的channelReadComplete()方法的调用
fireChannelRegistered
触发对下一个ChannelInboundHandler 上的fireChannelRegistered()方法的调用
fireChannelUnregistered
触发对下一个ChannelInboundHandler 上的fireChannelUnregistered()方法的调用
fireChannelWritabilityChanged
触发对下一个ChannelInboundHandler 上的fireChannelWritabilityChanged()方法的调用
fireExceptionCaught
触发对下一个ChannelInboundHandler 上的fireExceptionCaught(Throwable)方法的调用
fireUserEventTriggered
触发对下一个ChannelInboundHandler 上的fireUserEventTriggered(Object evt)方法的调用
handler
返回绑定到这个实例的ChannelHandler
isRemoved
如果所关联的ChannelHandler 已经被从ChannelPipeline中移除则返回true
name
返回这个实例的唯一名称
pipeline
返回这个实例所关联的ChannelPipeline
read
将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个channelRead事件,并(在最后一个消息被读取完成后)通知ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法
当使用ChannelHandlerContext 的API 的时候,有以下两点:
ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
如同我们在本节开头所解释的一样,相对于其他类的同名方法,ChannelHandler Context的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。