抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

NETTY粘包半包

什么是TCP粘包半包

定义

TCP 传输中,客户端发送数据,实际是把数据写入到了 TCP 的缓存中,粘包和半包也就会在此时产生。

​ 客户端给服务端发送了两条消息ABCDEF,服务端这边的接收会有多少种情况呢?有可能是一次性收到了所有的消息ABCDEF,有可能是收到了三条消息ABCDEF

​ 上面所说的一次性收到了所有的消息ABCDEF,类似于粘包。如果客户端发送的包的大小比 TCP 的缓存容量小,并且 TCP 缓存可以存放多个包,那么客户端和服务端的一次通信就可能传递了多个包,这时候服务端从 TCP 缓存就可能一下读取了多个包,这种现象就叫粘包

​ 上面说的后面那种收到了三条消息ABCDEF,类似于半包。如果客户端发送的包的大小比 TCP 的缓存容量大,那么这个数据包就会被分成多个包,通过 Socket 多次发送到服务端,服务端第一次从接受缓存里面获取的数据,实际是整个包的一部分,这时候就产生了半包(半包不是说只收到了全包的一半,是说收到了全包的一部分)。

TCP以流方式传输,是没有界限的一串数据,并没有消息边界。

  • TCP传输数据时,会根据底层的TCP缓存区实际情况进行数据包划分:
  • 1.业务上定义的完整数据(比方说一个完整的json串),可能会被TCP拆分成多个数据包进行发送(拆包)。
  • 2.业务上特殊含义的独立数据,也有可能因为大小或者缓冲区原因,被TCP封装成一个大数据包发送(粘包)。

通过一个图来理解

通过图我们可以发现,数据包接收有很多情况:

  1. 没有粘包拆包,终端2收到了完整的数据包A和数据包B。
  2. 终端2一次性读取到数据包A和数据包B,这就是粘包。
  3. 终端2读取到完整的数据包A和部分数据包B1,第二次才读取到数据包B剩余部分(数据包B2),这就是拆包。
  4. 类似第三点,数据包A也有可能分成两部分(A1、A2), 被前后读取。
  5. 假设数据包很大,那么可能产生多次拆包,如数据包A分N次被读取。

粘包、拆包

socket缓冲区与滑动窗口

​ 先明确一个概念:每个TCP socket在内核中都有一个发送缓冲区(SO_SNDBUF )和一个接收缓冲区(SO_RCVBUF),TCP的全双工的工作模式以及TCP的滑动窗口便是依赖于这两个独立的buffer以及此buffer的填充状态。SO_SNDBUF和SO_RCVBUF 在windows操作系统中默认情况下都是8K。

SO_SNDBUF

​ 进程发送的数据的时候(假设调用了一个send方法),最简单情况(也是一般情况),将数据拷贝进入socket的内核发送缓冲区之中,然后send便会在上层返回。换句话说,send返回之时,数据不一定会发送到对端去(和write写文件有点类似),send仅仅是把应用层buffer的数据拷贝进socket的内核发送buffer中。

SO_RCVBUF

​ 把接受到的数据缓存入内核,应用进程一直没有调用read进行读取的话,此数据会一直缓存在相应socket的接收缓冲区内。再啰嗦一点,不管进程是否读取socket,对端发来的数据都会经由内核接收并且缓存到socket的内核接收缓冲区之中。read所做的工作,就是把内核缓冲区中的数据拷贝到应用层用户的buffer里面,仅此而已。

滑动窗口

​ TCP链接在三次握手的时候,会将自己的窗口大小(window size)发送给对方,其实就是SO_RCVBUF指定的值。之后在发送数据的时,发送方必须要先确认接收方的窗口没有被填充满,如果没有填满,则可以发送。

​ 每次发送数据后,发送方将自己维护的对方的window size减小,表示对方的SO_RCVBUF可用空间变小。

​ 当接收方处理开始处理SO_RCVBUF 中的数据时,会将数据从socket 在内核中的接受缓冲区读出,此时接收方的SO_RCVBUF可用空间变大,即window size变大,接受方会以ack消息的方式将自己最新的window size返回给发送方,此时发送方将自己的维护的接受的方的window size设置为ack消息返回的window size。

​ 此外,发送方可以连续的给接受方发送消息,只要保证对方的SO_RCVBUF空间可以缓存数据即可,即window size>0。当接收方的SO_RCVBUF被填充满时,此时window size=0,发送方不能再继续发送数据,要等待接收方ack消息,以获得最新可用的window size。

​ 现在来看一下SO_RCVBUF和滑动窗口是如何造成粘包、拆包的?

粘包:假设发送方的每256 bytes表示一个完整的报文,接收方由于数据处理不及时,这256个字节的数据都会被缓存到SO_RCVBUF中。如果接收方的SO_RCVBUF中缓存了多个报文,那么对于接收方而言,这就是粘包。

拆包:考虑另外一种情况,假设接收方的window size只剩了128,意味着发送方最多还可以发送128字节,而由于发送方的数据大小是256字节,因此只能发送前128字节,等到接收方ack后,才能发送剩余字节。这就造成了拆包。

MSS和MTU分片

​ MSS是MSS是Maximum Segement Size的缩写,表示TCP报文中data部分的最大长度,是TCP协议在OSI五层网络模型中传输层(transport layer)对一次可以发送的最大数据的限制。

​ MTU最大传输单元是Maxitum Transmission Unit的简写,是OSI五层网络模型中链路层(datalink layer)对一次可以发送的最大数据的限制。

​ 当需要传输的数据大于MSS或者MTU时,数据会被拆分成多个包进行传输。由于MSS是根据MTU计算出来的,因此当发送的数据满足MSS时,必然满足MTU。归根结底:限制一次可发送数据大小的是MTU,MSS只是TCP协议在MTU基础限制的传输层一次可传输的数据的大小。

​ 为了更好的理解,我们先介绍一下在5层网络模型中应用通过TCP发送数据的流程:

  • 对于应用层来说,只关心发送的数据DATA,将数据写入socket在内核中的缓冲区SO_SNDBUF即返回,操作系统会将SO_SNDBUF中的数据取出来进行发送。

  • 传输层会在DATA前面加上TCP Header,构成一个完整的TCP报文。

  • 当数据到达网络层(network layer)时,网络层会在TCP报文的基础上再添加一个IP Header,也就是将自己的网络地址加入到报文中。

  • 到数据链路层时,还会加上Datalink Header和CRC。

  • 当到达物理层时,会将SMAC(Source Machine,数据发送方的MAC地址),DMAC(Destination Machine,数据接受方的MAC地址 )和Type域加入。

    可以发现数据在发送前,每一层都会在上一层的基础上增加一些内容,下图演示了MSS、MTU在这个过程中的作用。

​ MTU是以太网传输数据方面的限制,每个以太网帧都有最小的大小64bytes最大不能超过1518bytes。刨去以太网帧的帧头 (DMAC目的MAC地址48bit=6Bytes+SMAC源MAC地址48bit=6Bytes+Type域2bytes)14Bytes和帧尾 CRC校验部分4Bytes(这个部分有时候大家也把它叫做FCS),那么剩下承载上层协议的地方也就是Data域最大就只能有1500Bytes这个值 我们就把它称之为MTU。

​ 由于MTU限制了一次最多可以发送1500个字节,而TCP协议在发送DATA时,还会加上额外的TCP Header和Ip Header,因此刨去这两个部分,就是TCP协议一次可以发送的实际应用数据的最大大小,也就是MSS。

1
MSS长度=MTU长度-IP Header-TCP Header

​ TCP Header的长度是20字节,IPv4中IP Header长度是20字节,IPV6中IP Header长度是40字节,因此:在IPV4中,以太网MSS可以达到1460byte;在IPV6中,以太网MSS可以达到1440byte。

​ 需要注意的是MSS表示的一次可以发送的DATA的最大长度,而不是DATA的真实长度。发送方发送数据时,当SO_SNDBUF中的数据量大于MSS时,操作系统会将数据进行拆分,使得每一部分都小于MSS,这就是拆包,然后每一部分都加上TCP Header,构成多个完整的TCP报文进行发送,当然经过网络层和数据链路层的时候,还会分别加上相应的内容。

​ 细心的读者会发现,通过wireshark抓包工具的抓取的记录中,TCP在三次握手中的前两条报文中都包含了MSS=65495的字样。这是因为我们的抓包案例的client和server都运行在本地,不需要走以太网,所以不受到以太网MTU=1500的限制。MSS(65495)=MTU(65535)-IP Header(20)-TCP Header(20)。

​ linux服务器上输入ifconfig命令,可以查看不同网卡的MTU大小,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@www xxx]# ifconfig
eth0 Link encap:Ethernet HWaddr 00:16:3E:02:0E:EA
inet addr:10.144.211.78 Bcast:10.144.223.255 Mask:255.255.240.0
UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1
RX packets:266023788 errors:0 dropped:0 overruns:0 frame:0
TX packets:1768555 errors:0 dropped:0 overruns:0 carrier:0
collisions:0 txqueuelen:1000
RX bytes:12103832054 (11.2 GiB) TX bytes:138231258 (131.8 MiB)
Interrupt:164


lo Link encap:Local Loopback
inet addr:127.0.0.1 Mask:255.0.0.0
UP LOOPBACK RUNNING MTU:65535 Metric:1
RX packets:499956845 errors:0 dropped:0 overruns:0 frame:0
TX packets:499956845 errors:0 dropped:0 overruns:0 carrier:0
collisions:0 txqueuelen:0
RX bytes:86145804231 (80.2 GiB) TX bytes:86145804231 (80.2 GiB)

​ 可以看到,默认情况下,与外部通信的网卡eth0的MTU大小是1500个字节。而本地回环地址的MTU大小为65535,这是因为本地测试时数据不需要走网卡,所以不受到1500的限制。

​ MTU的大小可以通过类似以下命令修改:

1
ip link set eth0 mtu 65535

其中eth0是网卡的名字。

Nagle算法

​ TCP/IP协议中,无论发送多少数据,总是要在数据(DATA)前面加上协议头(TCP Header+IP Header),同时,对方接收到数据,也需要发送ACK表示确认。

​ 即使从键盘输入的一个字符,占用一个字节,可能在传输上造成41字节的包,其中包括1字节的有用信息和40字节的首部数据。这种情况转变成了4000%的消耗,这样的情况对于重负载的网络来是无法接受的。

​ 为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。(一个连接会设置MSS参数,因此,TCP/IP希望每次都能够以MSS尺寸的数据块来发送数据)。Nagle算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。

​ Nagle算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓“小段”,指的是小于MSS尺寸的数据块,所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的ACK确认该数据已收到。

Nagle算法的规则:

  1. 如果SO_SNDBUF中的数据长度达到MSS,则允许发送;
  2. 如果该SO_SNDBUF中含有FIN,表示请求关闭连接,则先将SO_SNDBUF中的剩余数据发送,再关闭;
  3. 设置了TCP_NODELAY=true选项,则允许发送。TCP_NODELAY是取消TCP的确认延迟机制,相当于禁用了Negale 算法。正常情况下,当Server端收到数据之后,它并不会马上向client端发送ACK,而是会将ACK的发送延迟一段时间(假一般是40ms),它希望在t时间内server端会向client端发送应答数据,这样ACK就能够和应答数据一起发送,就像是应答数据捎带着ACK过去。当然,TCP确认延迟40ms并不是一直不变的,TCP连接的延迟确认时间一般初始化为最小值40ms,随后根据连接的重传超时时间(RTO)、上次收到数据包与本次接收数据包的时间间隔等参数进行不断调整。另外可以通过设置TCP_QUICKACK选项来取消确认延迟。
  4. 未设置TCP_CORK选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
  5. 上述条件都未满足,但发生了超时(一般为200ms),则立即发送。

TCP粘包/半包发生的原因

其实从上面的定义,我们就可以大概知道产生的原因了。

粘包的主要原因

  1. 发送方每次写入数据 < 套接字(Socket)缓冲区大小
  2. 接收方读取套接字(Socket)缓冲区数据不够及时

半包的主要原因

  1. 发送方每次写入数据 > 套接字(Socket)缓冲区大小
  2. 发送的数据大于协议的 MTU (Maximum Transmission Unit,最大传输单元),因此必须拆包

其他角度

其实我们可以换个角度看待问题:

  1. 收发的角度看,便是一个发送可能被多次接收,多个发送可能被一次接收。
  2. 传输的角度看,便是一个发送可能占用多个传输包,多个发送可能共用一个传输包。

根本原因

​ 由于TCP协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会维持一个连接(Channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP的网络延迟要UDP的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象

UDP

​ 本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。

分包产生的原因

​ 分包产生的原因就简单的多:可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。

更具体的原因有三个,分别如下:

  1. 应用程序写入数据的字节大小大于套接字发送缓冲区的大小

  2. 进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长度

  3. 以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成托干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。

解决粘包半包问题

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

改成短连接

将 TCP 连接改成短连接,一个请求一个短连接。这样的话,建立连接到释放连接之间的消息即为传输的信息,消息也就产生了边界。

​ 这样的方法就是十分简单,不需要在我们的应用中做过多修改。但缺点也就很明显了,效率低下,TCP 连接和断开都会涉及三次握手以及四次握手,每个消息都会涉及这些过程,十分浪费性能。

​ 因此,并不推介这种方式。

定义通信协议

​ 粘包、拆包问题给接收方的数据解析带来了麻烦。例如SO_RCVBUF中存在了多个连续的完整包(粘包),因为每个包可能都是一个完整的请求或者响应,那么接收方需要能对此进行区分。如果存在不完整的数据(拆包),则需要继续等待数据,直至可以构成一条完整的请求或者响应。

​ 这个问题可以通过定义应用的协议(protocol)来解决。协议的作用就定义传输数据的格式。这样在接受到的数据的时候,如果粘包了,就可以根据这个格式来区分不同的包,如果拆包了,就等待数据可以构成一个完整的消息来处理。目前业界主流的协议(protocol)方案可以归纳如下:

固定长度

假设我们规定每3个字节,表示一个有效报文,如果我们分4次总共发送以下9个字节:

1
2
3
+---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+

那么根据协议,我们可以判断出来,这里包含了3个有效的请求报文

1
2
3
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+

这种方式下,消息边界也就是固定长度即可。

​ 优点就是实现很简单,缺点就是空间有极大的浪费,如果传递的消息中大部分都比较短,这样就会有很多空间是浪费的。

​ 因此,这种方式一般也是不推介的。

特殊字符分隔符协议

在包尾部增加回车或者空格符等特殊字符进行分割 。

例如,按行解析,遇到字符\n、\r\n的时候,就认为是一个完整的数据包。对于以下二进制字节流:

1
2
3
+--------------+
| ABC\nDEF\r\n |
+--------------+

那么根据协议,我们可以判断出来,这里包含了2个有效的请求报文

1
2
3
+-----+-----+
| ABC | DEF |
+-----+-----+

这种方式下,消息边界也就是分隔符本身。

优点是空间不再浪费,实现也比较简单。缺点是当内容本身出现分割符时需要转义,所以无论是发送还是接受,都需要进行整个内容的扫描。

因此,这种方式效率也不是很高,但可以尝试使用。

长度编码

将消息分为消息头和消息体,消息头中用一个int型数据(4字节),表示消息体长度的字段。在解析时,先读取内容长度Length,其值为实际消息体内容(Content)占用的字节数,之后必须读取到这么多字节的内容,才认为是一个完整的数据报文。

1
2
3
4
  header    body
+--------+----------+
| Length | Content |
+--------+----------+

​ 总的来说,通信协议就是通信双方约定好的数据格式,发送方按照这个数据格式来发送,接受方按照这个格式来解析。因此发送方和接收方要完成的工作不同,发送方要将发送的数据转换成协议规定的格式,称之为编码(encode);接收方需要根据协议的格式,对二进制数据进行解析,称之为解码(decode)。

这种方式,就有点类似 Http 请求中的 Content-Length,有一个专门的字段存储消息的长度。作为服务端,接受消息时,先解析固定长度的字段(length字段)获取消息总长度,然后读取后续内容。

​ 优点是精确定位用户数据,内容也不用转义。缺点是长度理论上有限制,需要提前限制可能的最大长度从而定义长度占用字节数。

​ 因此,十分推介用这种方式。

其他方式

其他方式就各不相同了,比如 JSON 可以看成是使用{}是否成对。这些优缺点就需要大家在各自的场景中进行衡量了。

Netty 中的实现

Netty 支持上文所讲的封装成帧(Framing)中的前三种方式,简单介绍下:

方式 解码 编码
固定长度 FixedLengthFrameDecoder 简单
分割符 DelimiterBasedFrameDecoder 简单
专门的 length 字段 LengthFieldBasedFrameDecoder LengthFieldPrepender

代码实现

分割符

在包尾增加分割符,比如回车换行符进行分割,例如FTP协议;

LineBaseEchoServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class LineBaseEchoServer {

public static final int PORT = 9998;

public static void main(String[] args) throws InterruptedException {
LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
System.out.println("服务器即将启动");
lineBaseEchoServer.start();
}

public void start() throws InterruptedException {
final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
}
}

private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
//添加换行解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//添加自己的handel处理器
ch.pipeline().addLast(new LineBaseServerHandler());
}
}
}
LineBaseServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@ChannelHandler.Sharable
public class LineBaseServerHandler extends ChannelInboundHandlerAdapter {
//channelRead读取次数
private AtomicInteger channelReadCount = new AtomicInteger(0);
//channelReadComplete读取次数
private AtomicInteger channelReadCompleteCount = new AtomicInteger(0);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
int count = channelReadCount.incrementAndGet();
ByteBuf byteBuf = (ByteBuf) msg;
String str = byteBuf.toString(CharsetUtil.UTF_8);
String copMessage = "服务器接收到消息channelRead:" + str + ";----" + count + System.getProperty("line.separator");
System.out.println(copMessage);
ByteBuf copByteBuf = Unpooled.buffer(copMessage.length());
copByteBuf.writeBytes(copMessage.getBytes());
ctx.writeAndFlush(copByteBuf);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
int count = channelReadCompleteCount.incrementAndGet();
System.out.println("服务器接收到消息ReadComplete:" + ";----" + count);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接到服务器--------");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生异常:" + cause.getMessage());
ctx.close();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端即将端口连接...");
}
}
LineBaseEchoClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class LineBaseEchoClient {

private final String host;

public LineBaseEchoClient(String host) {
this.host = host;
}

public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
final Bootstrap b = new Bootstrap();
b.group(group)/*将线程组传入*/
.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
.remoteAddress(new InetSocketAddress(host, LineBaseEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
System.out.println("已连接到服务器.....");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}

private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
//回车符做了分割
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//添加自己的handel处理器
ch.pipeline().addLast(new LineBaseClientHandler());
}
}

public static void main(String[] args) throws InterruptedException {
new LineBaseEchoClient("127.0.0.1").start();
}
}
LineBaseClientHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

private AtomicInteger counter = new AtomicInteger(0);

/*** 客户端读取到网络数据后的处理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8)
+ "] and the counter is:" + counter.incrementAndGet());
ctx.close();
}

/*** 客户端被通知channel活跃后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "粘包半包消息测试,换行符;" + System.getProperty("line.separator");
for (int i = 0; i < 10; i++) {
//控制发送频次 如果关闭就会发生粘包半包问题
Thread.sleep(500);
System.out.println(System.currentTimeMillis() + ":即将发送数据:" + request);
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}

/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

消息定长

例如每个报文的大小为固定长度200字节,如果不够,空位补空格;

服务端只需将服务端的ChannelInitializerImp 解码器new LineBasedFrameDecoder(1024)替换为new FixedLengthFrameDecoder( FixedLengthEchoClient.REQUEST.length())即可。

1
2
3
4
5
6
7
8
9
10
11
12

private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
//添加定长报文长度解码器,长度问请求的长度
ch.pipeline().addLast(
new FixedLengthFrameDecoder(
FixedLengthEchoClient.REQUEST.length()));
ch.pipeline().addLast(new FixedLengthServerHandler());
}

评论