抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

NETTY组件-Bootstrap

Bootstrap

​ 网络编程里,“服务器”和“客户端”实际上表示了不同的网络行为;换句话说,是监听传入的连接还是建立到一个或者多个进程的连接。

​ 因此,有两种类型的引导:一种用于客户端(简单地称为Bootstrap),而另一种(ServerBootstrap)用于服务器。无论你的应用程序使用哪种协议或者处理哪种类型的数据,唯一决定它使用哪种引导类的是它是作为一个客户端还是作为一个服务器。

比较Bootstrap 类

Bootstrap ServerBootstrap
网络编程中的作用 连接到远程主机和端口 绑定到一个本地端口
EventLoopGroup 的数目 1 2

​ ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的。

​ 第二个区别可能更加明显。引导一个客户端只需要一个EventLoopGroup,但是一个ServerBootstrap 则需要两个(也可以是同一个实例)。

​ 因为服务器需要两组不同的Channel。第一组将只包含一个ServerChannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的Channel。

​ 与ServerChannel 相关联的EventLoopGroup 将分配一个负责为传入连接请求创建Channel 的EventLoop。一旦连接被接受,第二个EventLoopGroup 就会给它的Channel分配一个EventLoop。

详细分析

​ 在深入了解地Netty的核心组件之后,发现它们的设计都很模块化,如果想要实现你自己的应用程序,就需要将这些组件组装到一起。Netty通过Bootstrap类,以对一个Netty应用程序进行配置(组装各个组件),并最终使它运行起来。对于客户端程序和服务器程序所使用到的Bootstrap类是不同的,后者需要使用ServerBootstrap,这样设计是因为,在如TCP这样有连接的协议中,服务器程序往往需要一个以上的Channel,通过父Channel来接受来自客户端的连接,然后创建子Channel用于它们之间的通信,而像UDP这样无连接的协议,它不需要每个连接都创建子Channel,只需要一个Channel即可。

一个比较明显的差异就是Bootstrap与ServerBootstrap的group()方法,后者提供了一个接收2个EventLoopGroup的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 该方法在Bootstrap的父类AbstractBootstrap中,泛型B为它当前子类的类型(为了链式调用)
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
} else if (this.group != null) {
throw new IllegalStateException("group set already");
} else {
this.group = group;
return this;
}
}

// ServerBootstrap中的实现,它也支持只用一个EventLoopGroup
public ServerBootstrap group(EventLoopGroup group) {
return this.group(group, group);
}

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
} else if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
} else {
this.childGroup = childGroup;
return this;
}
}

Bootstrap其实没有什么可以好说的,它就只是一个装配工,将各个组件拼装组合到一起,然后进行一些配置,有关它的详细API请参考Netty JavaDoc。下面我们将通过一个经典的Echo客户端与服务器的例子,来梳理一遍创建Netty应用的流程。

例子

首先实现的是服务器,我们先实现一个EchoServerInboundHandler,处理入站消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.printf("Server received: %s \n", in.toString(CharsetUtil.UTF_8));
// 由于读事件不是一次性就能把完整消息发送过来的,这里并没有调用writeAndFlush
ctx.write(in); // 直接把消息写回给客户端(会被出站消息处理器处理,不过我们的应用没有实现任何出站消息处理器)
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 等读事件已经完成时,冲刷之前写数据的缓冲区
// 然后添加了一个监听器,它会在Future完成时进行关闭该Channel.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}

// 处理异常,输出异常信息,然后关闭Channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

服务器的应用逻辑只有这么多,剩下就是用ServerBootstrap进行配置了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class EchoServer {

private final int port;

public EchoServer(int port) {
this.port = port;
}

public void start() throws Exception {
final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler();
EventLoopGroup group = new NioEventLoopGroup(); // 传输类型使用NIO
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group) // 配置EventLoopGroup
.channel(NioServerSocketChannel.class) // 配置Channel的类型
.localAddress(new InetSocketAddress(port)) // 配置端口号
.childHandler(new ChannelInitializer<SocketChannel>() {
// 实现一个ChannelInitializer,它可以方便地添加多个ChannelHandler
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
// i绑定地址,同步等待它完成
ChannelFuture f = b.bind().sync();
// 关闭这个Future
f.channel().closeFuture().sync();
} finally {
// 关闭应用程序,一般来说Netty应用只需要调用这个方法就够了
group.shutdownGracefully().sync();
}
}

public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.printf(
"Usage: %s <port> \n",
EchoServer.class.getSimpleName()
);
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}

}

接下来实现客户端,同样需要先实现一个入站消息处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class EchoClientInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {

/**
* 我们在Channel连接到远程节点直接发送一条消息给服务器
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8));
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
// 输出从服务器Echo的消息
System.out.printf("Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

然后配置客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class EchoClient {

private final String host;
private final int port;

public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port)) // 服务器的地址
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientInboundHandler());
}
});
ChannelFuture f = b.connect().sync(); // 连接到服务器
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}

public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s <host> <port> \n", EchoClient.class.getSimpleName());
return;
}

String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}

}

​ 实现一个Netty应用程序就是如此简单,用户大多数都是在编写各种应用逻辑的ChannelHandler(或者使用Netty内置的各种实用ChannelHandler),然后只需要将它们全部添加到ChannelPipeline即可。

Bootstrap执行流程

首先,创建了一个引导器 ServerBootstrap 实例,这个专门用于引导服务端的启动工作,直接new 创建即可。(客户端的引导器差不多,不过是创建Bootstrap 实例)

1
2
// 启动引导器
private static ServerBootstrap b = new ServerBootstrap();

启动一个Bootstrap,大致有8步,如下图:

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
try {   //1 设置reactor 线程
b.group(bossLoopGroup, workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
//3 设置监听端口
b.localAddress(new InetSocketAddress(port));
//4 设置通道选项
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

//5 装配流水线
b.childHandler(new ChannelInitializer<SocketChannel>()
{
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new ProtobufDecoder());
ch.pipeline().addLast(new ProtobufEncoder());
// pipeline管理channel中的Handler
// 在channel队列中添加一个handler来处理业务
ch.pipeline().addLast("serverHandler", serverHandler);
}
});
// 6 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功

ChannelFuture channelFuture = b.bind().sync();
LOGGER.info(ChatServer.class.getName() +
" started and listen on " +
channelFuture.channel().localAddress());

// 7 监听通道关闭事件
// 应用程序会一直等待,直到channel关闭
ChannelFuture closeFuture= channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e){
e.printStackTrace();
} finally{
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}

设置reactor 线程组

在设置 reactor 反应器线程组之前,创建了两个 NioEventLoopGroup 线程组:

  • bossLoopGroup

    表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接

  • workerGroup

    表示处理每一条连接的数据收发的线程组

在线程组和启动器都创建完成后,就可以开始设置线程组:通过 b.group(bossGroup, workerGroup) 方法,给引导器配置两大线程组。

配置完成之后,整个引导类的 reactor 线程正式确定。这里确定的工作模式,为父子线程的模型。

也可以不设置两个线程组,只设置一个线程组。

如果只设置一个线程组,具体的方法为 —— b.group( workerGroup) 。

配置完成一个线程组,则所有的 channel ,包括服务监听通道父亲channel 和所有的子channel ,都工作在同一个线程组中。

说明一下,一个线程组,可不止一条线程哈。

设置通道的IO类型

Netty 不止支持 Java NIO ,也支持阻塞式的 BIO (在Netty 中 叫做OIO)。

这里配置的是NIO,方法如下。

1
2
//2 设置nio类型的channel
b.channel(NioServerSocketChannel.class);

如果想指定 IO 模型为 BIO,那么这里配置上Netty的 OioServerSocketChannel.class 类型即可。由于NIO 的优势巨大,通常不会在Netty中使用BIO。

设置监听端口

1
2
//3 设置监听端口
b.localAddress(new InetSocketAddress(port));

这是最为简单的一步操作。

设置通道参数

  • childOption() 方法

    给每条child channel 连接设置一些TCP底层相关的属性,比如上面,我们设置了两种TCP属性,其中 ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true为开

  • option() 方法

    对于server bootstrap而言,这个方法,是给parent channel 连接设置一些TCP底层相关的属性。

TCP连接的参数详细介绍如下。

ChannelOption设置的参数

ChannelOption的各种属性在套接字选项中都有对应。

ChannelOption.SO_BACKLOG

​ ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小

ChannelOption.SO_REUSEADDR

​ ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。

ChannelOption.SO_KEEPALIVE

​ Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

​ ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

ChannelOption.SO_LINGER

​ ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送

ChannelOption.TCP_NODELAY

​ ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

装配流水线

ChannelPipeline 这是Netty处理请求的责任链,这是一个ChannelHandler的链表,而ChannelHandler就是用来处理网络请求的内容的。

每一个channel ,都有一个处理器流水线。

装配 child channel 流水线,调用 childHandler()方法,传递一个ChannelInitializer 的实例。

在 child channel 创建成功,开始通道初始化的时候,在bootstrap启动器中配置的 ChannelInitializer 实例就会被调用。

这个时候,才真正的执行去执行 initChannel 初始化方法,开始通道流水线装配。

流水线装配,主要是在流水线pipeline 的后面,增加负责数据读写、处理业务逻辑的handler。

1
2
3
4
5
6
7
8
9
10
11
12
b.childHandler(new ChannelInitializer<SocketChannel>()
{
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new ProtobufDecoder());
ch.pipeline().addLast(new ProtobufEncoder());
// pipeline管理channel中的Handler
// 在channel队列中添加一个handler来处理业务
ch.pipeline().addLast("serverHandler", serverHandler);
}
});

说明一下,ChannelInitializer这个类中,有一个泛型参数 SocketChannel,这里的类型,需要和前面的Channel类型对应上。

顺便说一下处理器。

​ 处理器 ChannelHandler 用来处理网络请求内容,有ChannelInboundHandler和ChannelOutboundHandler两种,ChannlPipeline会从头到尾顺序调用ChannelInboundHandler处理网络请求内容,从尾到头调用ChannelOutboundHandler 处理网络请求内容。

pipeline 流水线的图,大致如下:

如何装配parent 通道呢?

使用serverBootstrap.handler() 方法 。 handler()方法,可以和前面分析的childHandler()方法对应起来。childHandler()用于指定处理新连接数据的读写处理逻辑。 handler()方法装配parent 通道。

比方说:

1
2
3
4
5
6
7
serverBootstrap.handler(new ChannelInitializer() 
{
protected void initChannel(NioServerSocketChannel ch)
{
System.out.println("服务端启动中");
}
}

handler()用于指定在服务端启动过程中的一些逻辑,通常情况下呢,我们用不着这个方法。

开始绑定server

1
2
3
4
5
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = b.bind().sync();
LOGGER.info(ChatServer.class.getName() +
" started and listen on " +
channelFuture.channel().localAddress());

ChannelFuture

ChannelFuture 在Netty中的所有的I/O操作都是异步执行的,这就意味着任何一个I/O操作会立刻返回,不保证在调用结束的时候操作会执行完成。因此,会返回一个ChannelFuture的实例,通过这个实例可以获取当前I/O操作的状态。

1
2
3
4
// 7 监听通道关闭事件
// 应用程序会一直等待,直到channel关闭
ChannelFuture closeFuture= channelFuture.channel().closeFuture();
closeFuture.sync();

对于客户端来说,Bootstrap是开发netty客户端的基础,通过Bootstrap的connect方法来连接服务器端。该方法返回的也是ChannelFuture。

优雅关闭EventLoopGroup

1
2
3
4
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();

这个,会关闭所有的child channel,这是非常重要的。

关闭之后,会释放掉底层的资源,如TCP Socket 文件描述符,等等。

评论