本文共 17872 字,大约阅读时间需要 59 分钟。
TCP编程底层都有粘包和拆包机制,因为我们在C/S这种传输模型下,以TCP协议传输的时候,在网络中的byte其实就像是河水,TCP就像一个搬运工,将这流水从一端转送到另一端,这时又分两种情况:
1)如果客户端的每次制造的水比较多,也就是我们常说的客户端给的包比较大,TCP这个搬运工就会分多次去搬运。
2)如果客户端每次制造的水比较少的话,TCP可能会等客户端多次生产之后,把所有的水一起再运输到另一端
上述第一种情况,就是需要我们进行粘包,在另一端接收的时候,需要把多次获取的结果粘在一起,变成我们可以理解的信息,第二种情况,我们在另一端接收的时候,就必须进行拆包处理,因为每次接收的信息,可能是另一个远程端多次发送的包,被TCP粘在一起的
我们进行上述两种情况给出具体的场景:
1)单次发送的包内容过多的情况,拆包的现象:
我们先写客户端的bootstrap:
- package com.lyncc.netty.stickpackage.myself;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.LineBasedFrameDecoder;
- import io.netty.handler.codec.string.StringDecoder;
-
- public class BaseClient {
-
- static final String HOST = System.getProperty("host", "127.0.0.1");
- static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
- static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
-
- public static void main(String[] args) throws Exception {
-
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY,true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
-
- p.addLast(new StringDecoder());
- p.addLast(new BaseClientHandler());
- }
- });
-
- ChannelFuture future = b.connect(HOST, PORT).sync();
- future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
- future.channel().closeFuture().sync();
- } finally {
- group.shutdownGracefully();
- }
- }
-
- }
客户端的handler:
- package com.lyncc.netty.stickpackage.myself;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
-
- public class BaseClientHandler extends ChannelInboundHandlerAdapter{
-
- private byte[] req;
-
- private int counter;
-
- public BaseClientHandler() {
-
-
- req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. His book w"
- + "ill give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the process "
- + "of configuring and connecting all of Netty’s components to bring your learned about threading models in ge"
- + "neral and Netty’s threading model in particular, whose performance and consistency advantages we discuss"
- + "ed in detail In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. Hi"
- + "s book will give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the"
- + " process of configuring and connecting all of Netty’s components to bring your learned about threading "
- + "models in general and Netty’s threading model in particular, whose performance and consistency advantag"
- + "es we discussed in detailIn this chapter you general, we recommend Java Concurrency in Practice by Bri"
- + "an Goetz. His book will give We’ve reached an exciting point—in the next chapter;the counter is: 1 2222"
- + "sdsa ddasd asdsadas dsadasdas").getBytes();
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ByteBuf message = null;
-
-
-
-
-
- message = Unpooled.buffer(req.length);
- message.writeBytes(req);
- ctx.writeAndFlush(message);
- message = Unpooled.buffer(req.length);
- message.writeBytes(req);
- ctx.writeAndFlush(message);
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- String buf = (String) msg;
- System.out.println("Now is : " + buf + " ; the counter is : "+ ++counter);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- ctx.close();
- }
-
-
-
- }
服务端的serverBootstrap:
- package com.lyncc.netty.stickpackage.myself;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
-
- import java.net.InetSocketAddress;
-
- public class BaseServer {
-
- private int port;
-
- public BaseServer(int port) {
- this.port = port;
- }
-
- public void start(){
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
- .childHandler(new ChannelInitializer<SocketChannel>() {
-
- protected void initChannel(SocketChannel ch) throws Exception {
-
- ch.pipeline().addLast(new StringDecoder());
- ch.pipeline().addLast(new BaseServerHandler());
- };
-
- }).option(ChannelOption.SO_BACKLOG, 128)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
-
- ChannelFuture future = sbs.bind(port).sync();
-
- System.out.println("Server start listen at " + port );
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
-
- public static void main(String[] args) throws Exception {
- int port;
- if (args.length > 0) {
- port = Integer.parseInt(args[0]);
- } else {
- port = 8080;
- }
- new BaseServer(port).start();
- }
- }
服务端的handler:
- package com.lyncc.netty.stickpackage.myself;
-
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
-
- public class BaseServerHandler extends ChannelInboundHandlerAdapter{
-
-
- private int counter;
-
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
- String body = (String)msg;
- System.out.println("server receive order : " + body + ";the counter is: " + ++counter);
- }
-
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
-
- }
照例,我们先运行服务器端:
我们再运行客户端,客户端启动后,我们再看看服务器端的控制台打印输出:
我们可以看到服务器端分三次接收到了客户端两次发送的那段很长的信息
2)单次发送的包内容过多的情况,粘包的现象:
客户端和服务端的bootstrap不改变,我们修改一下,客户端发送信息的channelActive的代码:
- package com.lyncc.netty.stickpackage.myself;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
-
- public class BaseClientHandler extends ChannelInboundHandlerAdapter{
-
- private byte[] req;
-
- private int counter;
-
- public BaseClientHandler() {
- req = ("BazingaLyncc is learner").getBytes();
-
-
-
-
-
-
-
-
-
-
-
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ByteBuf message = null;
- for (int i = 0; i < 100; i++) {
- message = Unpooled.buffer(req.length);
- message.writeBytes(req);
- ctx.writeAndFlush(message);
- }
-
-
-
-
-
-
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- ctx.close();
- }
-
- }
我们再次启动服务器端:
启动客户端后,依旧看服务器端的控制台:
可以看出,客户端发送100次的信息,被服务器端分三次就接收了,这就发生了粘包的现象
以上就是典型的粘包和拆包的场景
2.4 粘包问题的解决办法
粘包的解决办法有很多,可以归纳如下。
- 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格。
- 在包尾增加回车换行符进行分割,例如FTP协议。
- 将消息分为消息头和消息体,消息头中包含消息长度的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度
在本案例中,我使用的是第2个解决办法在包尾增加回车换行符进行分割。
自定义协议实现
1、什么是粘包/拆包
一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。
2、解决办法
2.1、消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。
2.2、包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
2.3、将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段
3、自定义协议,来实现TCP的粘包/拆包问题
3.0 自定义协议,开始标记
3.1 自定义协议的介绍
3.2 自定义协议的类的封装
3.3 自定义协议的编码器
3.4 自定义协议的解码器
4、协议相关的实现
4.1 协议的封装
- import java.util.Arrays;
-
-
-
-
-
-
-
-
-
-
-
-
-
- public class SmartCarProtocol {
-
-
-
- private int head_data = ConstantValue.HEAD_DATA;
-
-
-
- private int contentLength;
-
-
-
- private byte[] content;
-
-
-
-
-
-
-
-
-
- public SmartCarProtocol(int contentLength, byte[] content) {
- this.contentLength = contentLength;
- this.content = content;
- }
-
- public int getHead_data() {
- return head_data;
- }
-
- public int getContentLength() {
- return contentLength;
- }
-
- public void setContentLength(int contentLength) {
- this.contentLength = contentLength;
- }
-
- public byte[] getContent() {
- return content;
- }
-
- public void setContent(byte[] content) {
- this.content = content;
- }
-
- @Override
- public String toString() {
- return "SmartCarProtocol [head_data=" + head_data + ", contentLength="
- + contentLength + ", content=" + Arrays.toString(content) + "]";
- }
-
- }
4.2 协议的编码器
-
-
-
-
-
-
-
-
-
-
-
-
- public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> {
-
- @Override
- protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg,
- ByteBuf out) throws Exception {
-
-
- out.writeInt(msg.getHead_data());
-
- out.writeInt(msg.getContentLength());
-
- out.writeBytes(msg.getContent());
- }
- }
4.3 协议的解码器
- import java.util.List;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
-
-
-
-
-
-
-
-
-
-
-
-
-
- public class SmartCarDecoder extends ByteToMessageDecoder {
-
-
-
-
-
-
-
- public final int BASE_LENGTH = 4 + 4;
-
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
- List<Object> out) throws Exception {
-
- if (buffer.readableBytes() >= BASE_LENGTH) {
-
-
-
- if (buffer.readableBytes() > 2048) {
- buffer.skipBytes(buffer.readableBytes());
- }
-
-
- int beginReader;
-
- while (true) {
-
- beginReader = buffer.readerIndex();
-
- buffer.markReaderIndex();
-
- if (buffer.readInt() == ConstantValue.HEAD_DATA) {
- break;
- }
-
-
-
- buffer.resetReaderIndex();
- buffer.readByte();
-
-
-
-
- if (buffer.readableBytes() < BASE_LENGTH) {
- return;
- }
- }
-
-
-
- int length = buffer.readInt();
-
- if (buffer.readableBytes() < length) {
-
- buffer.readerIndex(beginReader);
- return;
- }
-
-
- byte[] data = new byte[length];
- buffer.readBytes(data);
-
- SmartCarProtocol protocol = new SmartCarProtocol(data.length, data);
- out.add(protocol);
- }
- }
-
- }
4.4 服务端加入协议的编/解码器
4.5 客户端加入协议的编/解码器
5、服务端的实现
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
-
- public class Server {
-
- public Server() {
- }
-
- public void bind(int port) throws Exception {
-
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
-
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChildChannelHandler())
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
-
- ChannelFuture f = b.bind(port).sync();
-
- f.channel().closeFuture().sync();
- } finally {
-
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
-
-
-
-
- private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
-
- ch.pipeline().addLast(new SmartCarEncoder());
- ch.pipeline().addLast(new SmartCarDecoder());
-
- ch.pipeline().addLast(new ServerHandler());
- }
- }
-
- public static void main(String[] args) throws Exception {
- new Server().bind(9999);
- }
- }
6、服务端Handler的实现
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
-
- public class ServerHandler extends ChannelHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
-
- SmartCarProtocol body = (SmartCarProtocol) msg;
- System.out.println("Server接受的客户端的信息 :" + body.toString());
-
-
- String str = "Hi I am Server ...";
- SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length,
- str.getBytes());
-
- ctx.writeAndFlush(response);
-
-
-
-
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
-
- ctx.close();
- }
- }
7、客户端的实现
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
-
- public class Client {
-
-
-
-
-
-
-
-
- public void connect(int port, String host) throws Exception {
-
- EventLoopGroup group = new NioEventLoopGroup();
- try {
-
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new MyChannelHandler());
-
- ChannelFuture f = b.connect(host, port).sync();
-
-
- f.channel().closeFuture().sync();
-
- } finally {
- group.shutdownGracefully();
- System.out.println("客户端优雅的释放了线程资源...");
- }
-
- }
-
-
-
-
- private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
-
- ch.pipeline().addLast(new SmartCarEncoder());
- ch.pipeline().addLast(new SmartCarDecoder());
-
- ch.pipeline().addLast(new ClientHandler());
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- new Client().connect(9999, "127.0.0.1");
-
- }
-
- }
8、客户端Handler的实现
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.ReferenceCountUtil;
-
-
- public class ClientHandler extends ChannelHandlerAdapter {
-
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
-
-
- String data = "I am client ...";
-
- byte[] content = data.getBytes();
-
- int contentLength = content.length;
-
- SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content);
-
- ctx.writeAndFlush(protocol);
- }
-
-
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- try {
-
- SmartCarProtocol body = (SmartCarProtocol) msg;
- System.out.println("Client接受的客户端的信息 :" + body.toString());
-
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- ctx.close();
- }
-
- }