抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

NETTY通讯协议

按行分割协议

LineBasedFrameDecoderLineEncoder采用的通信协议非常简单,即按照行进行分割,遇到一个换行符,则认为是一个完整的报文。在发送方,使用LineEncoder为数据添加换行符;在接受方,使用LineBasedFrameDecoder对换行符进行解码。

LineBasedFrameDecoder

LineBasedFrameDecoder采用的通信协议格式非常简单:使用换行符\n或者\r\n作为依据,遇到\n或者\r\n都认为是一条完整的消息。

LineBasedFrameDecoder提供了2个构造方法,如下:

1
2
3
4
5
6
7
8
public LineBasedFrameDecoder(final int maxLength) {
this(maxLength, true, false);
}
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
this.maxLength = maxLength;
this.failFast = failFast;
this.stripDelimiter = stripDelimiter;
}
参数
maxLength

​ 表示一行最大的长度,如果超过这个长度依然没有检测到\n或者\r\n,将会抛出TooLongFrameException

failFast

​ 与maxLength联合使用,表示超过maxLength后,抛出TooLongFrameException的时机。如果为true,则超出maxLength后立即抛出TooLongFrameException,不继续进行解码;如果为false,则等到完整的消息被解码后,再抛出TooLongFrameException异常。

stripDelimiter

​ 解码后的消息是否去除\n,\r\n分隔符,例如对于以下二进制字节流:

1
2
3
+--------------+
| ABC\nDEF\r\n |
+--------------+

如果stripDelimiter为true,则解码后的结果为:

1
2
3
+-----+-----+
| ABC | DEF |
+-----+-----+

如果stripDelimiter为false,则解码后的结果为:

1
2
3
+-------+---------+
| ABC\n | DEF\r\n |
+-------+---------+

LineEncoder

按行编码,给定一个CharSequence(如String),在其之后添加换行符\n或者\r\n,并封装到ByteBuf进行输出,与LineBasedFrameDecoder相对应。LineEncoder提供了多个构造方法,最终调用的都是:

1
2
public LineEncoder(LineSeparator lineSeparator, //换行符号
Charset charset) //换行符编码,默认为CharsetUtil.UTF_8

Netty提供了LineSeparator来指定换行符,其定义了3个常量, 一般使用DEFAULT即可。

1
2
3
4
5
6
7
8
9
public final class LineSeparator {
//读取系统属性line.separator,如果读取不到,默认为\n
public static final LineSeparator DEFAULT = new LineSeparator(StringUtil.NEWLINE);
//unix操作系统换行符
public static final LineSeparator UNIX = new LineSeparator("\n”);
//windows操作系统换行度
public static final LineSeparator WINDOWS = new LineSeparator("\r\n”);
//...
}

使用案例

LineBasedFrameDecoder / LineEncoder使用案例

服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class LineBasedFrameDecoderServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 使用LineBasedFrameDecoder解决粘包问题,其会根据"\n"或"\r\n"对二进制数据进行拆分,封装到不同的ByteBuf实例中
ch.pipeline().addLast(new LineBasedFrameDecoder(1024, true, true));
// 自定义这个ChannelInboundHandler打印拆包后的结果
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf packet = (ByteBuf) msg;
System.out.println(
new Date().toLocaleString() + ":" + packet.toString(Charset.defaultCharset()));
}
}
});
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(8080).sync(); // (7)
System.out.println("LineBasedFrameDecoderServer Started on 8080...");
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

​ LineBasedFrameDecoder要解决粘包问题,根据”\n”或”\r\n”对二进制数据进行解码,可能会解析出多个完整的请求报文,其会将每个有效报文封装在不同的ByteBuf实例中,然后针对每个ByteBuf实例都会调用一次其他的ChannelInboundHandler的channelRead方法。

​ 因此LineBasedFrameDecoder接受到一次数据,其之后的ChannelInboundHandler的channelRead方法可能会被调用多次,且之后的ChannelInboundHandler的channelRead方法接受到的ByteBuf实例参数,包含的都是都是一个完整报文的二进制数据。因此无需再处理粘包问题,只需要将ByteBuf中包含的请求信息解析出来即可,然后进行相应的处理。本例中,我们仅仅是打印。

客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class LineBasedFrameDecoderClient {
public static void main(String[] args) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast(new LineEncoder());自己添加换行符,不使用LineEncoder
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//在于server建立连接后,即发送请求报文
public void channelActive(ChannelHandlerContext ctx) {
byte[] req1 = ("hello1" + System.getProperty("line.separator")).getBytes();
byte[] req2 = ("hello2" + System.getProperty("line.separator")).getBytes();
byte[] req3_1 = ("hello3").getBytes();
byte[] req3_2 = (System.getProperty("line.separator")).getBytes();
ByteBuf buffer = Unpooled.buffer();
buffer.writeBytes(req1);
buffer.writeBytes(req2);
buffer.writeBytes(req3_1);
ctx.writeAndFlush(buffer);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
buffer = Unpooled.buffer();
buffer.writeBytes(req3_2);
ctx.writeAndFlush(buffer);
}
});
}
});
// Start the client.
ChannelFuture f = b.connect("127.0.0.1",8080).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}

​ 需要注意的是,在client端,我们并没有使用LineEncoder进行编码,原因在于我们要模拟粘包、拆包。如果使用LineEncoder,那么每次调用ctx.write或者ctx.writeAndflush,LineEncoder都会自动添加换行符,无法模拟拆包问题。

​ 我们通过自定义了一个ChannelInboundHandler,用于在连接建立后,发送3个请求报文req1、req2、req3。其中req1和req2都是一个完整的报文,因为二者都包含一个换行符;req3分两次发送,第一次发送req3_1,第二次发送req3_2。

​ 首先我们将req1、req2和req3_1一起发送:

1
2
3
+------------------------+
| hello1\nhello2\nhello3 |
+------------------------+

而服务端经过解码后,得到两个完整的请求req1、req2、以及req3的部分数据:

1
2
3
+----------+----------+--------
| hello1\n | hello2\n | hello3
+----------+----------+--------

​ 由于req1、req2都是一个完整的请求,因此可以直接处理。而req3由于只接收到了一部分(半包),需要等到2秒后,接收到另一部分才能处理。

​ 因此当我们先后启动server端和client之后,在server端的控制台将会有类似以下输出:

1
2
3
4
LineBasedFrameDecoderServer Started on 8080...
2018-9-8 12:49:02:hello1
2018-9-8 12:49:02:hello2
2018-9-8 12:49:04:hello3

​ 可以看到hello1和hello2是同一时间打印出来的,而hello3是2秒之后才打印。说明LineBasedFrameDecoder成功帮我们处理了粘包和半包问题。

小结

  • 部分同学可能认为调用一个writeAndFlush方法就是发送了一个请求,这是对协议的理解不够深刻。一个完整的请求是由协议规定的,例如我们在这里使用了LineBasedFrameDecoder,潜在的含义就是:一行数据才算一个完整的报文。因此当你调用writeAndFlush方法,如果发送的数据有多个换行符,意味着相当于发送了多次有效请求;而如果发送的数据不包含换行符,意味着你的数据还不足以构成一个有效请求。
  • 对于粘包问题,例如是两个有效报文粘在一起,那么服务端解码后,可以立即处理这两个报文。
  • 对于拆包问题,例如一个报文是完整的,另一个只是半包,netty会对半包的数据进行缓存,等到可以构成一个完整的有效报文后,才会进行处理。这意味着么netty需要缓存每个client的半包数据,如果很多client都发送半包,缓存的数据就会占用大量内存空间。因此我们在实际开发中,不要像上面案例那样,有意将报文拆开来发送。
  • 此外,如果client发送了半包,而剩余部分内容没有发送就关闭了,对于这种情况,netty服务端在销毁连接时,会自动清空之前缓存的数据,不会一直缓存。

分隔符与Base64编解码

DelimiterBasedFrameDecoder

​ 上一节我们介绍了LineBasedFrameDecoder,其以换行符\n或者\r\n作为依据,遇到\n或者\r\n都认为是一条完整的消息。

​ 而DelimiterBasedFrameDecoder与LineBasedFrameDecoder类似,只不过更加通用,允许我们指定任意特殊字符作为分隔符。我们还可以同时指定多个分隔符,如果在请求中发的确有多个分隔符,将会选择内容最短的一个分隔符作为依据。

例如:

1
2
3
+--------------+
| ABC\nDEF\r\n |
+--------------+

如果我们指定分隔符为\n,那么将会解码出来2个消息

1
2
3
+-----+-----+
| ABC | DEF |
+-----+-----+

如果我们指定\r\n作为分隔符,那么只会解码出来一条消息

1
2
3
+----------+
| ABC\nDEF |
+----------+

DelimiterBasedFrameDecoder提供了多个构造方法,最终调用的都是以下构造方法:

1
2
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters)
参数
maxLength

​ 表示一行最大的长度,如果超过这个长度依然没有检测到\n或者\r\n,将会抛出TooLongFrameException

failFast

​ 与maxLength联合使用,表示超过maxLength后,抛出TooLongFrameException的时机。如果为true,则超出maxLength后立即抛出TooLongFrameException,不继续进行解码;如果为false,则等到完整的消息被解码后,再抛出TooLongFrameException异常。

stripDelimiter

​ 解码后的消息是否去除分隔符。

delimiters

​ 分隔符,我们需要先将分割符,写入到ByteBuf中,然后当做参数传入。

​ 需要注意的是,netty并没有提供一个DelimiterBasedFrameDecoder对应的编码器实现(笔者没有找到),因此在发送端需要自行编码,添加分隔符。

Base64编解码

对于以特殊字符作为报文分割条件的协议的解码器,如:LineBasedFrameDecoder、DelimiterBasedFrameDecoder。都存在一个典型的问题,如果发送数据当中本身就包含了分隔符,怎么办?如:我们要发送的内容为:

1
hello1\nhello2\nhello3\n

​ 我们需要把这个内容整体当做一个有效报文来处理,而不是拆分成hello1、hello2、hello3。一些同学可能想到那可以换其他的特殊字符,但是如果内容中又包含你想指定的其他特殊字符怎么办呢?

​ 因此我们通常需要发送的内容进行base64编码,base64中总共只包含了64个字符。

索引 对应字符 索引 对应字符 索引 对应字符 索引 对应字符
0 A 17 R 34 i 51 z
1 B 18 S 35 j 52 0
2 C 19 T 36 k 53 1
3 D 20 U 37 l 54 2
4 E 21 V 38 m 55 3
5 F 22 W 39 n 56 4
6 G 23 X 40 o 57 5
7 H 24 Y 41 p 58 6
8 I 25 Z 42 q 59 7
9 J 26 a 43 r 60 8
10 K 27 b 44 s 61 9
11 L 28 c 45 t 62 +
12 M 29 d 46 u 63 /
13 N 30 e 47 v
14 O 31 f 48 w
15 P 32 g 49 x
16 Q 33 h 50 y

​ 我们可以指定这64个字符之外的其他字符作为特殊分割字符;而接收端对应的进行base64解码,得到对应的原始的二进制流,然后进行处理。Netty提供了Base64Encoder/Base64Decoder来帮我们处理这个问题。需要注意的是,只需要对内容进行base64编码,分隔符不需要编码。

使用案例

DelimiterBasedFrameDecoder结合Base64编解码案例

服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class DelimiterBasedFrameDecoderServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ByteBuf delemiter= Unpooled.buffer();
delemiter.writeBytes("&".getBytes());
//先使用DelimiterBasedFrameDecoder解码,以&作为分割符
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, true, true,delemiter));
//之后使用Base64Decoder对数据进行解码,得到报文的原始的二进制流
ch.pipeline().addLast(new Base64Decoder());
//对请求报文进行处理
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf packet = (ByteBuf) msg;
System.out.println(
new Date().toLocaleString() + ":" + packet.toString(Charset.defaultCharset()));
}
}
});
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(8080).sync(); // (7)
System.out.println("DelimiterBasedFrameDecoderServer Started on 8080...");
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

​ Server接受到的数据,首先我们要去除分隔符,然后才能进行base64解码。因此首选我们添加了DelimiterBasedFrameDecoder根据&处理粘包办包问题,之后使用Base64Decoder进行解码,最后通过一个自定义ChannelInboundHandler打印请求的数据。

客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DelimiterBasedFrameDecoderClient {
public static void main(String[] args) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//在于server建立连接后,即发送请求报文
public void channelActive(ChannelHandlerContext ctx) {
//先对要发送的原始内容进行base64编码
ByteBuf content = Base64.encode(Unpooled.buffer().writeBytes("hello&tianshouzhi&".getBytes
()));
//之后添加分隔符
ByteBuf req = Unpooled.copiedBuffer(content);
req.writeBytes("&".getBytes());
ctx.writeAndFlush(req);
}
});
}
});
// Start the client.
ChannelFuture f = b.connect("127.0.0.1",8080).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}

​ 在编写client端代码时我们先对原始内容进行base64编码,然后添加分割符之后进行输出。

​ 需要注意,虽然Netty提供了Base64Encoder进行编码,这里并没有直接使用,如果直接使用Base64Encoder,那么会对我们输出的所有内容进行编码,意味着分隔符也会被编码,这显然不符合我们的预期,所以这里直接使用了Netty提供了Base64工具类来处理。

​ 如果一定要使用Base64Encoder,那么代码需要进行相应的修改,自定义的ChannelInboundHandler只输出原始内容,之后通过Base64Encoder进行编码,然后需要额外再定义一个ChannelOutboundHandler添加分隔符,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof ByteBuf){
((ByteBuf) msg).writeBytes("&".getBytes());
}
}
});
ch.pipeline().addLast(new Base64Encoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//在于server建立连接后,即发送请求报文
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf req = Unpooled.buffer().writeBytes("hello&tianshouzhi&".getBytes());
ctx.writeAndFlush(req);
}
});

上述两种方案效果是等价的。

当我们先后启动server和client后,会看到server端控制台输出:

1
2
DelimiterBasedFrameDecoderServer Started on 8080...
2018-9-8 13:56:32:hello&tianshouzhi&

说明经过base64编码后,我们的请求中可以包含分隔符作为内容。

如果我们将base64编解码相关逻辑去掉,你将会看到的输出是:

1
2
3
DelimiterBasedFrameDecoderServer Started on 8080...
2018-9-8 14:13:31:hello
2018-9-8 14:13:31:tianshouzhi

也就是说,原始内容也被误分割了,解码失败,读者可以自行验证。

定长协议

FixedLengthFrameDecoder

FixedLengthFrameDecoder采用的是定长协议:即把固定的长度的字节数当做一个完整的消息

例如,我们规定每3个字节,表示一个有效报文,如果我们分4次总共发送以下9个字节:

1
2
3
+---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+

那么通过FixedLengthFrameDecoder解码后,实际上只会解析出来3个有效报文

1
2
3
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+

FixedLengthFrameDecodert提供了以下构造方法

1
2
3
4
5
6
7
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}

​ 其中:frameLength就是我们指定的长度。

​ 需要注意的是FixedLengthFrameDecoder并没有提供一个对应的编码器,因为接收方只需要根据字节数进行判断即可,发送方无需编码

使用案例

FixedLengthFrameDecoder使用案例

服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FixedLengthFrameDecoderServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(3));
// 自定义这个ChannelInboundHandler打印拆包后的结果
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf packet = (ByteBuf) msg;
System.out.println(
new Date().toLocaleString() + ":" + packet.toString(Charset.defaultCharset()));
}
}
});
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(8080).sync(); // (7)
System.out.println("FixedLengthFrameDecoderServer Started on 8080...");
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FixedLengthFrameDecoderClient {
public static void main(String[] args) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//在于server建立连接后,即发送请求报文
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf A = Unpooled.buffer().writeBytes("A".getBytes());
ByteBuf BC = Unpooled.buffer().writeBytes("BC".getBytes());
ByteBuf DEFG = Unpooled.buffer().writeBytes("DEFG".getBytes());
ByteBuf HI = Unpooled.buffer().writeBytes("HI".getBytes());
ctx.writeAndFlush(A);
ctx.writeAndFlush(BC);
ctx.writeAndFlush(DEFG);
ctx.writeAndFlush(HI);
}
});
}
});
// Start the client.
ChannelFuture f = b.connect("127.0.0.1",8080).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}

​ 先后运行server端与client端后,server端控制台输出

1
2
3
4
LineBasedFrameDecoderServer Started on 8080...
2018-9-8 15:20:20:ABC
2018-9-8 15:20:20:DEF
2018-9-8 15:20:20:GHI

​ 可以看到FixedLengthFrameDecoder的确将请求的数据,按照每3个字节当做一个完整的请求报文。

​ 通常情况下,很少有client与server交互时,直接使用定长协议,可能会造成浪费。例如你实际要发送的实际只有3个字节,但是定长协议设置的1024,那么可能你就要为这3个字节基础上,在加1021个空格,以便server端可以解析这个请求。在下一节我们将要介绍的LengthFieldBasedFrameDecoder,支持动态指定报文的长度。

变长协议

协议简介

大多数的协议(私有或者公有),协议头中会携带长度字段,用于标识消息体或者整包消息的长度,例如SMPP、HTTP协议等。由于基于长度解码需求 的通用性,Netty提供了LengthFieldBasedFrameDecoder/LengthFieldPrepender,自动屏蔽TCP底层的拆包和粘 包问题,只需要传入正确的参数,即可轻松解决“读半包“问题。

​ 发送方使用LengthFieldPrepender给实际内容Content进行编码添加报文头Length字段,接受方使用LengthFieldBasedFrameDecoder进行解码。协议格式如下所示:

1
2
3
+--------+----------+
| Length | Content |
+--------+----------+
Length字段

表示Conent部分的字节数,例如Length值为100,那么意味着Conent部分占用的字节数就是100。

​ Length字段本身是个整数,也要占用字节,一般会使用固定的字节数表示。例如我们指定使用2个字节(有符号)表示length,那么可以表示的最大值为32767(约等于32K),也就是说,Content部分占用的字节数,最大不能超过32767。当然,Length字段存储的是Content字段的真实长度。

Content字段

是我们要处理的真实二进制数据。 在发送Content内容之前,首先需要获取其真实长度,添加在内容二进制流之前,然后再发送。Length占用的字节数+Content占用的字节数,就是我们总共要发送的字节。

​ 事实上,我们可以把Length部分看做报文头,报文头包含了解析报文体(Content字段)的相关元数据,例如Length报文头表示的元数据就是Content部分占用的字节数。当然,LengthFieldBasedFrameDecoder并没有限制我们只能添加Length报文头,我们可以在Length字段前或后,加上一些其他的报文头,此时协议格式如下所示:

1
2
3
+---------+--------+----------+----------+
|........ | Length | ....... | Content |
+---------+--------+----------+----------+

​ 不过对于LengthFieldBasedFrameDecoder而言,其关心的只是Length字段。因此当我们在构造一个LengthFieldBasedFrameDecoder时,最主要的就是告诉其如何处理Length字段。

LengthFieldPrepender参数详解

LengthFieldPrepender提供了多个构造方法,最终调用的都是:

1
2
3
public LengthFieldPrepender(
ByteOrder byteOrder, int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength)
参数
  • byteOrder:表示Length字段本身占用的字节数使用的是大端还是小端编码
  • lengthFieldLength:表示Length字段本身占用的字节数,只可以指定 1, 2, 3, 4, 或 8
  • lengthAdjustment:表示Length字段调整值
  • lengthIncludesLengthFieldLength:表示Length字段本身占用的字节数是否包含在Length字段表示的值中。

例如:对于以下包含12个字节的报文

1
2
3
+----------------+
| "HELLO, WORLD" |
+----------------+

​ 假设我们指定Length字段占用2个字节,lengthIncludesLengthFieldLength指定为false,即不包含本身占用的字节,那么Length字段的值为0x000C(即12)。

1
2
3
+--------+----------------+
+ 0x000C | "HELLO, WORLD" |
+--------+----------------+

​ 如果我们指定lengthIncludesLengthFieldLength指定为true,那么Length字段的值为:0x000E(即14)=Length(2)+Content字段(12)

1
2
3
+--------+----------------+
+ 0x000E | "HELLO, WORLD" |
+--------+----------------+

关于lengthAdjustment字段的含义,参见下面的LengthFieldBasedFrameDecoder。

​ LengthFieldPrepender尤其值得说明的一点是,其提供了实现零拷贝的另一种思路(实际上编码过程,是零拷贝的一个重要应用场景)。

  • 在Netty中我们可以使用ByteBufAllocator.directBuffer()创建直接缓冲区实例,从而避免数据从堆内存(用户空间)向直接内存(内核空间)的拷贝,这是系统层面的零拷贝;
  • 也可以使用CompositeByteBuf把两个ByteBuf合并在一起,例如一个存放报文头,另一个存放报文体。而不是创建一个更大的ByteBuf,把两个小ByteBuf合并在一起,这是应用层面的零拷贝。

​ 而LengthFieldPrepender,由于需要在原来的二进制数据之前添加一个Length字段,因此就需要对二者进行合并发送。但是LengthFieldPrepender并没有采用CompositeByteBuf,其编码过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//1 获得Length字段的值:真实数据可读字节数+Length字段调整值
int length = msg.readableBytes() + lengthAdjustment;
if (lengthIncludesLengthFieldLength) {
length += lengthFieldLength;
}
...

//2 根据lengthFieldLength指定的值(1、2、3、4、8),创建一个ByteBuffer实例,写入length的值,
//并添加到List类型的out变量中
switch (lengthFieldLength) {
case 1:
if (length >= 256) {
throw new IllegalArgumentException(
"length does not fit into a byte: " + length);
}
out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
break;
...
case 8:
out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
break;
default:
throw new Error("should not reach here");
}
//3 最后,再将msg本身添加到List中(msg.retain是增加一次引用,返回的还是msg本身)
out.add(msg.retain());
}
}

LengthFieldBasedFrameDecoder参数详解

LengthFieldBasedFrameDecoder提供了多个构造方法,最终调用的都是:

1
2
3
public LengthFieldBasedFrameDecoder(
ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip, boolean failFast)
参数
byteOrder

表示协议中Length字段的字节是大端还是小端

maxFrameLength

表示协议中Content字段的最大长度,如果超出,则抛出TooLongFrameException异常。

lengthFieldOffset

表示Length字段的偏移量,即在读取一个二进制流时,跳过指定长度个字节之后的才是Length字段。如果Length字段之前没有其他报文头,指定为0即可。如果Length字段之前还有其他报文头,则需要跳过之前的报文头的字节数。

lengthFieldLength

表示Length字段占用的字节数。指定为多少,需要看实际要求,不同的字节数,限制了Content字段的最大长度。

  • 如果lengthFieldLength是1个字节,那么限制为128bytes;
  • 如果lengthFieldLength是2个字节,那么限制为32767(约等于32K);
  • 如果lengthFieldLength是3个字节,那么限制为8388608(约等于8M);
  • 如果lengthFieldLength是4个字节,那么限制为2147483648(约等于2G)。

​ lengthFieldLength与maxFrameLength并不冲突。例如我们现在希望限制报文Content字段的最大长度为32M。显然,我们看到了上面的四种情况,没有任何一个值,能刚好限制Content字段最大值刚好为32M。那么我们只能指定lengthFieldLength为4个字节,其最大限制2G是大于32M的,因此肯定能支持。但是如果Content字段长度真的是2G,server端接收到这么大的数据,如果都放在内存中,很容易造成内存溢出。

​ 为了避免这种情况,我们就可以指定maxFrameLength字段,来精确的指定Content部分最大字节数,显然,其值应该小于lengthFieldLength指定的字节数最大可以表示的值。

lengthAdjustment

​ Length字段补偿值。对于绝大部分协议来说,Length字段的值表示的都是Content字段占用的字节数。但是也有一些协议,Length字段表示的是Length字段本身占用的字节数+Content字段占用的字节数。由于Netty中在解析Length字段的值是,默认是认为其只表示Content字段的长度,因此解析可能会失败,所以要进行补偿。在后面的案例3中进行了演示。

主要用于处理Length字段前后还有其他报文头的情况。具体作用请看后面的案例分析。

initialBytesToStrip

​ 解码后跳过的初始字节数,表示获取完一个完整的数据报文之后,忽略前面指定个数的字节。例如报文头只有Length字段,占用2个字节,在解码后,我们可以指定跳过2个字节。这样封装到ByteBuf中的内容,就只包含Content字段的字节内容不包含Length字段占用的字节。

failFast

​ 如果为true,则表示读取到Length字段时,如果其值超过maxFrameLength,就立马抛出一个 TooLongFrameException,而为false表示只有当真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出。

​ 下面通过再几个案例,来说明这些参数是如何控制LengthFieldBasedFrameDecoder解码行为的,首先我们讨论报文只包含Length字段和Content字段的情况;接着讨论报文头除了包含Length字段,还有其他报文头字段的情况。

报文只包含Length字段和Content字段

报文只包含Length字段和Content字段时,协议格式如下:

1
2
3
+--------+----------+
| Length | Content |
+--------+----------+

​ 假设Length字段占用2个字节,其值为0x000C,意味着Content字段长度为12个字节,假设其内容为”HELLO, WORLD”。下面演示指定不同解析参数时,解码后的效果。

案例1

lengthFieldOffset = 0 //因为报文以Length字段开始,不需要跳过任何字节,所以offset为0

lengthFieldLength = 2 //因为我们规定Length字段占用字节数为2,所以这个字段值传入的是2

lengthAdjustment = 0 //这里Length字段值不需要补偿,因此设置为0

initialBytesToStrip = 0 //不跳过初始字节,意味着解码后的ByteBuf中,包含Length+Content所有内容

1
2
3
4
5
解码前 (14 bytes)                 解码后 (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
案例2

lengthFieldOffset = 0 //参见案例1

lengthFieldLength = 2 //参见案例1

lengthAdjustment = 0 //参见案例1

initialBytesToStrip = 2 //这里跳过2个初始字节,也就是Length字段占用的字节数,意味着解码后的ByteBuf中,只包含Content字段

1
2
3
4
5
BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
案例3

lengthFieldOffset = 0 // 参见案例1

lengthFieldLength = 2 // 参见案例1

lengthAdjustment = -2 // Length字段补偿值指定为-2

initialBytesToStrip = 0 // 参见案例1

1
2
3
4
5
BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+

​ 这个案例需要进行一下特殊说明,其Length字段值表示:Length字段本身占用的字节数+Content字节数。所以我们看到解码前,其值为0x000E(14),而不是0x000C(12)。而真实Content字段内容只有2个字节,因此我们需要用:Length字段值0x000E(14),减去lengthAdjustment指定的值(-2),表示的才是Content字段真实长度。

报文头包含Length字段以外的其他字段,同时包含Content字段

​ 通常情况下,一个协议的报文头除了Length字段,还会包含一些其他字段,例如协议的版本号,采用的序列化协议,是否进行了压缩,甚至还会包含一些预留的头字段,以便未来扩展。这些字段可能位于Length之前,也可能位于Length之后,此时的报文协议格式如下所示:

1
2
3
+---------+--------+----------+----------+
|........ | Length | ....... | Content |
+---------+--------+----------+----------+

​ 当然,对于LengthFieldBasedFrameDecoder来说,其只关心Length字段。按照Length字段的值解析出一个完整的报文放入ByteBuf中,也就是说,LengthFieldBasedFrameDecoder只负责粘包、半包的处理,而ByteBuf中的实际内容解析,则交由后续的解码器进行处理。

下面依然通过案例进行说明:

案例4

这个案例中,在Length字段之前,还包含了一个Header字段,其占用2个字节,Length字段占用3个字节。

lengthFieldOffset = 2 // 需要跳过Header字段占用的2个字节,才是Length字段

lengthFieldLength = 3 //Length字段占用3个字节

lengthAdjustment = 0 //由于Length字段的值为12,表示的是Content字段长度,因此不需要调整

initialBytesToStrip = 0 //解码后,不裁剪字节

1
2
3
4
5
BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header | Length | Actual Content |----->| Header | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
案例5

在这个案例中,Header字段位于Length字段之后

lengthFieldOffset = 0 // 由于一开始就是Length字段,因此不需要跳过

lengthFieldLength = 3 // Length字段占用3个字节,其值为0x000C,表示Content字段长度

lengthAdjustment = 2 // 由于Length字段之后,还有Header字段,因此需要+2个字节,读取Header+Content的内容

initialBytesToStrip = 0 //解码后,不裁剪字节

1
2
3
4
5
BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header | Actual Content |----->| Length | Header | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
案例6

这个案例中,Length字段前后各有一个报文头字段HDR1、HDR2,各占1个字节

lengthFieldOffset = 1 //跳过HDR1占用的1个字节读取Length

lengthFieldLength = 2 //Length字段占用2个字段,其值为0x000C(12),表示Content字段长度

lengthAdjustment = 1 //由于Length字段之后,还有HDR2字段,因此需要+1个字节,读取HDR2+Content的内容

initialBytesToStrip = 3 //解码后,跳过前3个字节

1
2
3
4
5
BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
案例7

这个案例中,Length字段前后各有一个报文头字段HDR1、HDR2,各占1个字节。Length占用2个字节,表示的是整个报文的总长度。

lengthFieldOffset = 1 //跳过HDR1占用的1个字节读取Length

lengthFieldLength = 2 //Length字段占用2个字段,其值为0x0010(16),表示HDR1+Length+HDR2+Content长度

lengthAdjustment = -3 //由于Length表示的是整个报文的长度,减去HDR1+Length占用的3个字节后,读取HDR2+Content长度

initialBytesToStrip = 3 //解码后,跳过前3个字节

1
2
3
4
5
BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+

代码案例

了解了LengthFieldPrepender和LengthFieldBasedFrameDecoder的作用后,我们编写一个实际的案例。client发送一个请求,使用LengthFieldPrepender进行编码,server接受请求,使用LengthFieldBasedFrameDecoder进行解码。

我们采用最简单的的通信协议格式,不指定其他报文头:

1
2
3
+--------+-----------+
| Length | Content |
+--------+-----------+
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LengthFieldBasedFrameDecoderClient {
public static void main(String[] args) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(2,0,false));
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在于server建立连接后,即发送请求报文
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush("i am request!");
ctx.writeAndFlush("i am a anther request!");
}
});
}
});
// Start the client.
ChannelFuture f = b.connect("127.0.0.1", 8080).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}

​ 在Client端我们自定义了一个ChannelInboundHandler,在连接被激活时,立即发送两个请求:**”i am request!”、”i am a anther request!”** 。另外注意,我们是分两次调用ctx.writeAndFlush,每次调用都会导致当前请求数据经过StringEncoder进行编码,得到包含这个请求内容ByteBuf实例, 然后再到LengthFieldPrepender进行编码添加Length字段。

​ 因此我们发送的实际上是以下2个报文

1
2
3
4
5
6
7
8
Length(13)    Content
+--------+-------------------+
+ 0x000D | "i am request!" |
+--------+-------------------+
Length(23) Content
+--------+-----------------------------+
+ 0x0017 | "i am a anther request!" |
+--------+-----------------------------+
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class LengthFieldBasedFrameDecoderServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(16384, 0, 2, 0, 2));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("receive req:"+msg);
}
});
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(8080).sync(); // (7)
System.out.println("LengthFieldBasedFrameDecoderServer Started on 8080...");
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

​ 在Server端,我们通过LengthFieldBasedFrameDecoder进行解码,并删除Length字段的2个字节,交给之后StringDecoder转换为字符串,最后在我们的自定义的ChannelInboundHandler进行打印。

分别启动server端与client端,在server端,我们将看到输出:

1
2
3
LengthFieldBasedFrameDecoderServer Started on 8080...
receive req:i am request!
receive req:i am a anther request!

​ 注意这里打印了2次,表示LengthFieldBasedFrameDecoder的确解码成功。

​ 部分读者可能不信任这个结果,那么可以尝试将Server端的LengthFieldBasedFrameDecoder和Client端的LengthFieldPrepender注释掉,再次运行的话,大概率你在server端控制台将看到:

1
2
LengthFieldBasedFrameDecoderServer Started on 8080...
receive req:i am request!i am a anther request!

也就是说,在没有使用LengthFieldBasedFrameDecoder和LengthFieldPrepender的情况下,发生了粘包,而服务端无法区分。

总结

​ LengthFieldBasedFrameDecoder作用实际上只是帮我们处理粘包和半包的问题,其只负责将可以构成一个完整有效的请求报文封装到ByteBuf中,之后还要依靠其他的解码器对报文的内容进行解析,例如上面编写的String将其解析为字符串,只不过在后续的解码器中,不需要处理粘包半包问题了,认为ByteBuf中包含的内容肯定是一个完整的报文即可。

​ 对于请求和响应都是字符串的情况下,LengthFieldBasedFrameDecoder/LengthFieldPrepender的威力还没有完全展示出来。我们甚至可以将自定义的POJO类作为请求/响应,在发送数据前对其序列化字节数组,然后通过LengthFieldPrepender为其制定Length;服务端根据Length解析得到二进制字节流,然后反序列化再得到POJO类实例。在下一节我们将会详细的进行介绍。

评论