基于Netty实现TCP私有协议(详细)

756次阅读
没有评论
 

什么是协议

从生活角度去理解:协议大部分情况下是指两个或两个以上实体为了开展某项活动,经过协商后双方达成的一致意见。例如租房合同协议、劳动合同协议等。
 
从互联网角度去理解:协议是指网络通信的参与方必须遵循相同的规则,这套规则称为协议,它最终体现为在网络上传输的数据包的格式。例如TCP、FTP、AMQP、HTTP协议等。
 
目前互联网上通信协议基本底层都是基于TCP协议作为网络通信的基础进行再次封装的。

什么是TCP私有协议

TCP是目前整个互联网的传输层通信协议,它的中文全称是:传输控制协议(TCP,Transmission Control Protocol),它是一种面向连接的、可靠的、基于字节流的传输层通信协议,由IETF的RFC 793 [1]  定义。
 
目前主流的公有协议或私有协议基本上都是基于TCP/IP协议基础上进行实现的,例如大名鼎鼎的万维网HTTP协议、消息队列AMQP协议、SMTP协议等都是工作在 TCP/IP协议族上的公有协议。
 
而TCP私有协议通常是指公司内部基于TCP协议基础上定制的一套内部协议,私有协议通常具有灵活性高,按实际场景的需要进行定制,同时由于私有协议的封闭性,从而使私有协议天生具备安全性。

私有协议使用场景

定制一套成熟的、完善的私有协议通常需要具备较高软件开发水平的人员参与实现,所以是需要投入一定成本的,不到必要场景时,并不会优先去考虑自研内部私有协议。下面说一下什么场景下可以考虑定制私有协议:
1、公司高层战略上有计划走自研路线,这种情况下一般会投入一定的成本来支持自研需要的成本。
2、在分布式环境下,对网络通信数据传输过程有较高的安全性时,就有必要考虑制定私有协议,通过制定私有协议可以对数据按照一定的算法进行加解密。
3、在分布式环境下,对网络通信性能方面有极致的要求时,可以考虑定制TPC私有协议来替代常用HTTP协议,由于TCP工作在OSI第四层传输层,相比HTTP工作在OSI第七层应用层来说,性能消耗要低。
4、其他涉及全双工(客户端可以发送数据给服务端,服务端也可以主动发数据给客户端)网络编程的需求,例如在线聊天等场景。

小T协议总体设计

在开始设计协议前,这里先给本次协议取个名称:小T协议,方面后面称呼。

设计目标

Ø 支持高并发场景。
Ø 支持高性能要求。
Ø 支持自定义Java-Bean序列化和反序列。
Ø 支持自定义安全准入机制。
Ø 支持连接心跳检测机制。
Ø 支持断连重试重发机制。
 

设计原则

Ø 可靠性原则
Ø 安全性原则
Ø 可扩展性原则
 

技术选项

Ø 开发语言:JAVA
Ø 网络I/O模型:NIO
Ø 网络编程框架:Netty
Ø 项目构建:Maven
Ø 编码工具:社区版IDEA
Ø 日志框架:log4j

通信模型

 

通信流程如下:
 
1、底层TCP协议三次握手成功之后,客户端开始发起私有协议的握手请求到服务端,消息头command指令为1。
2、服务端接收到客户端握手请求时,进行安全认证,认证通过后返回握手应答,消息头command指令为2。
3、客户端接收到服务端握手应答后,创建定时发送心跳任务,并间隔5s发送一次心跳请求给服务端,消息头command指令为5。
4、服务端接收到心跳请求后,立即应答心跳,消息头command指令为6。
5、客户端可以发送业务消息给服务端,服务端接收到客户端的业务消息后进行处理,处理结束后可以根据实际情况是否需要应答消息给客户端。
 

协议定义

协议的定义主要是定义消息格式,小T协议整体上分为两部分:
 
Ø 消息头(header)
Ø 消息体(body)
 
每个部分具体定义如下表格所示:
 
表 1-1 小T协议数据格式定义表(Message):

字段

类型

长度(bit)

说明

header

Header

不定长

消息头定义

body

Object

不定长

消息体定义

 
表 1-2 小T协议数据格式消息头定义表(Header):

字段

类型

长度(bit)

说明

magic

int

32

魔数标识。

固定值:0xACAFDCBA,4个字节

mainVersion

char

16

主版本。

1-9,2个字节

minorVersion

char

16

次版本。

1-9,2个字节

length

long

64

消息长度,包括header+body

sid

long

64

集群节点全局唯一

command

char

16

1、握手请求消息

2、握手应答消息

3、业务请求消息

4、业务应答消息

5、心跳请求消息

6、心跳应答消息

attribute

Map

不定长

消息头扩展属性

 
 

协议编码

小T协议编码通过继承Netty的消息转字节数组类,该类在消息出站时(write)会判断消息泛型类型是否匹配,如果匹配则调用子类实现的encode方法将对象编码为字节数组进行网络传输,这里先通过JSON将对象解析为字符串,然后得到字符串的字节数组。
 
注意,编码时我们将整个对象的字节数组长度通过Netty缓冲对象ByteBuf的writeInt方法写在最开始位置,这样做方便后面解码时先读出这个长度,通过这个长度从ByteBuf种读取对象字节内容进行解密。

协议解码

通过继承Netty的LengthFieldBasedFrameDecoder解码类在消息入站时进行解码。解码方式先通过ByteBuf的readInt方法读出小子字节的长度,然后在通过这个长度值从ByteBuf读取消息对象字节内容进行解码,这里均为通过JSON反解析为消息对象类。

TCP粘包和拆包

 
粘包和拆包是面向应用层而言的,我们通常把应用层如HTTP每次请求或响应/或SMTP每次发送的邮件看作是一个消息包,而TCP是基于字节流进行传输的,它把应用层发送的所有消息包看作是一连串没有边界的字节流进行传输,TCP为了方便网络优化,在发送端和接收端都设有缓冲区,以及TCP更底层的数据链路层的MSS(MTU-40通常为1460)大小限制,导致TCP在发送端和接收端都可能造成粘包和拆包现象。下面简单分析产生粘包和拆包的情况:
 
1、当发送/接收的消息包字节流大小超过TCP发送/接收端缓冲区剩余空间时,将会发生拆。
2、当发送/接收的消息包字节流大小小于TCP发送/接收端缓冲区剩余空间时,将会发生粘包。
3、当发送的消息包字节流大小超过MSS限制时,将会发生拆包。
4、当发送端启用Nagle算法时,会加大粘包和拆包的概率。
5、当TCP时间滑动窗口的变化也会加大粘包和拆包的发生概率。
 
 
由于TCP不关心业务,所以粘包和拆包需要在应用层去处理,大致有3种解决方式:
 
1、固定消息包长度:限制发送端发送的单条消息固定长度,不足的空位补齐,接收端每次读取固定长度的字节作为一条完整消息。
2、协定消息结束符:发送端在每个消息包末尾增加一个结束符(例如换行符)来表示一条完整消息的结束位置,这样接收端可以根据这个结束符来判断消息包是否接收完成。如果没有接收完成需要接收端本地缓存当前的部分内容,等待剩余部分的字节流到来。
3、消息头长度标识:通过把每个消息包的长度放到消息头上,接收端获取消息头的时候解析出消息长度,然后向后读取该长度的内容,不够的仍然需要本地缓存当前的部分内容,等待剩余部分的字节流到来。
 
 
Netty 框架中对 TCP 粘包拆包有现成的解决方案,只要在其 pipline 中加入对应的解码器就可实现(通常需要自己实现编解码器,解码器通过继承的方式即可具备解决能力)。
 
Ø LineBasedFrameDecoder:协定消息结束符方式(默认固定换行符)。
Ø DelimiterBasedFrameDecoder:协定消息结束符方式(可自定义结束符)。
Ø FixedLengthFrameDecoder:固定消息包长度方式。
Ø LengthFieldBasedFrameDecoder:消息头长度标识方式(小T协议采用这种方式)。
 
注意,Netty自带的TCP粘包和拆包解决方案是通过解码阶段去处理的,所以为了解码阶段处理正确,实现自定义编码时必须严格按照解码阶段使用和配置的粘包和拆包处理方案来编码。
 

建立连接

正常情况下先启动小T协议服务端,然后客户端通过异步线程方式启动小T协议客户端进行连接请求,如果连接建立成功将由客户端发起小T协议设计的握手请求到服务端,如果连接建立失败将由客户端重试机制进行定时重连。

断连重试

断开重试流程如下:
1、客户端启动时设置20s读超时事件监听器。
2、当服务端进程异常退出时,由于Netty的write或writeAndFlush都是往本地内存缓冲写,缓冲满了批量发送出去,所以客户端启动时设置了20s读超时事件监听器,所以客户端超过20s没有接收到读请求时,会触发超时监听事件而端口连接。
3、当客户端连接断开时,每间隔5s尝试重连一次,直到连接成功为止。
 

关闭连接

当发生网络异常、握手失败、长时间没接收到服务端心跳应答时(客户端读超时),客户端将关闭连接资源,并定义重连。
 

握手机制

协议连接建立成功后由客户端发起握手请求,服务器接收握手请求后首先检查客户端消息头的主次版本是否和服务端一致,魔数值是否正常,以及进行安全检查,如果检查不通过则返回握手失败标记位:0返回客户端,客户端接收到握手失败标记位时会打印日志并断开连接进行重试任务。如果检查通过服务端会返回握手应答指令command=2,和成功标记位:1给客户端,客户端接收握手成功后会进行下一步心跳任务的建立。

心跳检查

当客户端和服务端握手成功后,由客户端启动定时发送心跳检查任务,每间隔5s发送一次心跳包给服务端,服务端接收到心跳时将回给客户端应答,如果服务端发送异常或网络问题没有应答心跳包给客户端,将触发客户端读超时而异常方式断开连接进行重连任务。

安全机制

基于JDK自带的SPI技术支持自定义安全机制扩展。在客户端发起握手请求时由服务端执行安全检查,检查通过应答握手成功标记位:1,否则应答握手失败标记位:0。客户端接收到握手失败标记位时将打印日志并断开连接进行重连任务。
 

业务处理

基于JDK自带的SPI技术实现协议业务处理的自定义扩展开发,默认将接收到的业务请求回调XiaotBizRespCallbackProvide接口execute方法。协议使用者可以基于JDK自带的SPI机制在项目resources目录下创建META-INF/services目录,创建对应接口的实现类方式进行扩展,示例截图如下:
 

小T协议源码开发

下面给出小T协议完整源码内容。

整体项目结构

 

pom.xml配置

  1. project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. modelVersion>4.0.0modelVersion>
  4. parent>
  5. groupId>org.springframework.bootgroupId>
  6. artifactId>spring-boot-starter-parentartifactId>
  7. version>2.5.0version>
  8. relativePath/>
  9. parent>
  10. groupId>com.xiaot.protocolgroupId>
  11. artifactId>xiaot-protocolartifactId>
  12. version>0.0.1-SNAPSHOTversion>
  13. name>xiaot-protocolname>
  14. description>XiaoT-Protocol project for Spring Bootdescription>
  15. properties>
  16. java.version>1.8java.version>
  17. properties>
  18. dependencies>
  19. dependency>
  20. groupId>org.projectlombokgroupId>
  21. artifactId>lombokartifactId>
  22. version>1.18.4version>
  23. dependency>
  24. dependency>
  25. groupId>org.slf4jgroupId>
  26. artifactId>slf4j-apiartifactId>
  27. version>1.7.25version>
  28. dependency>
  29. dependency>
  30. groupId>org.slf4jgroupId>
  31. artifactId>slf4j-log4j12artifactId>
  32. version>1.7.25version>
  33. dependency>
  34. dependency>
  35. groupId>io.nettygroupId>
  36. artifactId>netty-allartifactId>
  37. version>4.1.45.Finalversion>
  38. dependency>
  39. dependency>
  40. groupId>com.alibabagroupId>
  41. artifactId>fastjsonartifactId>
  42. version>1.2.76version>
  43. dependency>
  44. dependencies>
  45. build>
  46. finalName>xiaot-protocolfinalName>
  47. plugins>
  48. plugin>
  49. groupId>org.apache.maven.pluginsgroupId>
  50. artifactId>maven-compiler-pluginartifactId>
  51. configuration>
  52. source>1.8source>
  53. target>1.8target>
  54. encoding>${project.build.sourceEncoding}encoding>
  55. configuration>
  56. plugin>
  57. plugin>
  58. groupId>org.apache.maven.pluginsgroupId>
  59. artifactId>maven-surefire-pluginartifactId>
  60. configuration>
  61. skipTests>trueskipTests>
  62. configuration>
  63. plugin>
  64. plugins>
  65. build>
  66. project>

log4j.properties配置文件

  1. log4j.rootLogger = debug,stdout,debug,info,error
  2. log4j.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %p %l -%m%n
  3. log4j.appender.stdout = org.apache.log4j.ConsoleAppender
  4. log4j.appender.stdout.Target = System.out
  5. log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
  6. log4j.appender.stdout.layout.ConversionPattern = ${log4j.pattern}
  7. log4j.appender.debug = org.apache.log4j.DailyRollingFileAppender
  8. log4j.appender.debug.File = ./logs/xiaot-info.log
  9. log4j.appender.debug.Append = true
  10. log4j.appender.debug.Threshold = DEBUG
  11. log4j.appender.debug.layout = org.apache.log4j.PatternLayout
  12. log4j.appender.debug.layout.ConversionPattern =${log4j.pattern}
  13. log4j.appender.info = org.apache.log4j.DailyRollingFileAppender
  14. log4j.appender.info.File = ./logs/xiaot-info.log
  15. log4j.appender.info.Append = true
  16. log4j.appender.info.Threshold = INFO
  17. log4j.appender.info.layout = org.apache.log4j.PatternLayout
  18. log4j.appender.info.layout.ConversionPattern = ${log4j.pattern}
  19. log4j.appender.error = org.apache.log4j.DailyRollingFileAppender
  20. log4j.appender.error.File = ./logs/xiaot-error.log
  21. log4j.appender.error.Append = true
  22. log4j.appender.error.Threshold = ERROR
  23. log4j.appender.error.layout = org.apache.log4j.PatternLayout
  24. log4j.appender.error.layout.ConversionPattern = ${log4j.pattern}

协议定义源码开发

消息头XiaotHeader.java源码:

  1. @Data
  2. public class XiaotHeader {
  3. /**
  4. * 协议魔数,固定值 0xACAFDCBA
  5. */
  6. private int major = Const.MAJOR;
  7. /**
  8. * 协议主版本:
  9. */
  10. private char mainVersion = Const.MAIN_VERSION;
  11. /**
  12. * 协议次版本:
  13. */
  14. private char minorVersion = Const.MINOR_VERSION;
  15. /**
  16. * 消息总长度
  17. */
  18. private int length;
  19. /**
  20. * 会话id
  21. */
  22. private long sid;
  23. /**
  24. * 指令
  25. */
  26. private char command;
  27. /**
  28. * 是否成功 1:成功,0:失败
  29. */
  30. private int success;
  31. /**
  32. * 消息头扩展属性
  33. */
  34. private Map attribute = new HashMap();
  35. }

消息体XiaotMessage.java源码:

  1. @Data
  2. public class XiaotMessage {
  3. private XiaotHeader header;
  4. private Object body;
  5. }

协议指令枚举Command.java源码:

  1. public enum Command {
  2. HANDSHAKE_REQ('1', "握手请求消息"),
  3. HANDSHAKE_RESP('2', "握手应答消息"),
  4. BIZ_REQ('3', "业务请求消息"),
  5. BIZ_RESP('4', "业务应答消息"),
  6. HEARTBEAT_REQ('5', "心跳请求消息"),
  7. HEARTBEAT_RESP('6', "心跳应答消息"),
  8. ;
  9. private char val;
  10. private String desc;
  11. Command(char val, String desc) {
  12. this.val = val;
  13. this.desc = desc;
  14. }
  15. public static Command of(char val){
  16. for (Command value : Command.values()) {
  17. if (value.getVal() == val){
  18. return value;
  19. }
  20. }
  21. return null;
  22. }
  23. public char getVal() {
  24. return val;
  25. }
  26. public void setVal(char val) {
  27. this.val = val;
  28. }
  29. public String getDesc() {
  30. return desc;
  31. }
  32. public void setDesc(String desc) {
  33. this.desc = desc;
  34. }
  35. }

协议编解码源码开发

消息编码器XiaotMessageEncoder.java源码:

  1. @Slf4j
  2. public class XiaotMessageEncoder extends MessageToByteEncoder {
  3. @Override
  4. protected void encode(ChannelHandlerContext channelHandlerContext, XiaotMessage message, ByteBuf byteBuf) throws Exception {
  5. if (message == null || message.getHeader() == null) {
  6. throw new Exception("encode message be do not null");
  7. }
  8. //Serialize
  9. byte[] raw = JSONObject.toJSONString(message).getBytes(StandardCharsets.UTF_8);
  10. message.getHeader().setLength(raw.length);
  11. // 这里严格按照解码器TCP粘包和拆包解决方案要求格式进行编码
  12. // writeInt表示往消息头写内容长度,占用空间是4个字节
  13. byteBuf.writeInt(message.getHeader().getLength());
  14. byteBuf.writeBytes(raw);
  15. }
  16. }

消息解码器XiaotMessageDecoder.java源码:

  1. public class XiaotMessageDecoder extends LengthFieldBasedFrameDecoder {
  2. public XiaotMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
  3. super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
  4. }
  5. @Override
  6. protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  7. //Netty处理粘包和拆包得到完整的消息包之后,会调用这里逻辑
  8. ByteBuf byteBuf = (ByteBuf) super.decode(ctx, in);
  9. if (byteBuf == null) {
  10. return null;
  11. }
  12. try {
  13. int length = byteBuf.readInt();
  14. //初始化byte数组
  15. byte[] array = new byte[length];
  16. //从byteBuf上次读位置开始读取字节数据到byte数组0,length位置
  17. byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length);
  18. //实例化解码工具类
  19. //
  20. String json = new String(array, StandardCharsets.UTF_8);
  21. XiaotMessage message = JSONObject.parseObject(json, XiaotMessage.class);
  22. return message;
  23. } finally {
  24. byteBuf.release();
  25. }
  26. }
  27. }

握手源码开发

客户端发起握手请求处理器HandshakeReqHandler.java源码:

  1. @Slf4j
  2. public class HandshakeReqHandler extends ChannelInboundHandlerAdapter {
  3. /**
  4. * 在连接通道channel激活时立即发送握手请求
  5. *
  6. * @param ctx
  7. * @throws Exception
  8. */
  9. @Override
  10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  11. XiaotHeader header = new XiaotHeader();
  12. header.setCommand(Command.HANDSHAKE_REQ.getVal());
  13. header.setSid(SidUtil.INSTANCE.nextId());
  14. XiaotMessage sendMsg = new XiaotMessage();
  15. sendMsg.setHeader(header);
  16. ChannelWriteUtil.write(ctx.channel(), sendMsg, future -> {
  17. if (future.cause() != null) {
  18. throw new IOException(future.cause());
  19. } else if (!future.isSuccess()) {
  20. throw new IOException("client send handshake request fail...");
  21. } else {
  22. log.debug("client send handshake request");
  23. }
  24. });
  25. }
  26. @Override
  27. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  28. //类型转换
  29. XiaotMessage receiveMsg = (XiaotMessage) msg;
  30. //处理握手应答指令
  31. if (receiveMsg != null && receiveMsg.getHeader() != null && Command.HANDSHAKE_RESP.getVal() == receiveMsg.getHeader().getCommand()) {
  32. if (Const.SUCCESS == receiveMsg.getHeader().getSuccess()) {
  33. log.debug("client receive handshake response");
  34. } else {
  35. log.error("client receive handshake response fail " + (receiveMsg.getBody() == null ? "" : (String) receiveMsg.getBody()));
  36. ctx.close();
  37. }
  38. }
  39. //只有成功才继续往下流转
  40. ctx.fireChannelRead(msg);
  41. }
  42. @Override
  43. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  44. ctx.fireExceptionCaught(cause);
  45. }
  46. }

服务端应答握手处理器HandshakeRespHandler.java源码:

  1. @Slf4j
  2. public class HandshakeRespHandler extends ChannelInboundHandlerAdapter {
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. //类型转换
  6. XiaotMessage receiveMsg = (XiaotMessage) msg;
  7. //参数检查
  8. if (receiveMsg != null && receiveMsg.getHeader() != null && Command.HANDSHAKE_REQ.getVal() == receiveMsg.getHeader().getCommand()) {
  9. //检查协议魔数
  10. if (Const.MAJOR != receiveMsg.getHeader().getMajor()) {
  11. ChannelWriteUtil.write(ctx.channel(), buildFailMessage("xiaot protocol major check fail," + receiveMsg.getHeader().getMajor()));
  12. return;
  13. }
  14. //检查主版本
  15. else if (Const.MAIN_VERSION != receiveMsg.getHeader().getMainVersion()) {
  16. ChannelWriteUtil.write(ctx.channel(), buildFailMessage("xiaot protocol main version check fail," + receiveMsg.getHeader().getMainVersion()));
  17. }
  18. //检查次版本
  19. else if (Const.MINOR_VERSION != receiveMsg.getHeader().getMinorVersion()) {
  20. ChannelWriteUtil.write(ctx.channel(), buildFailMessage("xiaot protocol minor version check fail," + receiveMsg.getHeader().getMinorVersion()));
  21. }
  22. //安全认证 || 握手应答
  23. else {
  24. XiaotSecurity security = new XiaotSecurity();
  25. security.setHeader(receiveMsg.getHeader());
  26. security.setRemoteAddress((InetSocketAddress) ctx.channel().remoteAddress());
  27. //基于JDK自带SPI技术实现安全认证
  28. ServiceLoader loader = ServiceLoader.load(XiaotSecurityAuthProvide.class);
  29. boolean isOk = true;
  30. for (XiaotSecurityAuthProvide service : loader) {
  31. String errorMsg = service.isAllow(security);
  32. if (errorMsg != null) {
  33. isOk = false;
  34. ChannelWriteUtil.write(ctx.channel(), buildFailMessage("xiaot protocol security check fail, " + errorMsg));
  35. break;
  36. }
  37. }
  38. if (isOk) {
  39. log.debug("server receive handshake request...");
  40. //应答握手
  41. XiaotMessage respMsg = new XiaotMessage();
  42. XiaotHeader respHeader = new XiaotHeader();
  43. respHeader.setSuccess(Const.SUCCESS);
  44. respHeader.setCommand(Command.HANDSHAKE_RESP.getVal());
  45. respMsg.setHeader(respHeader);
  46. ChannelWriteUtil.write(ctx.channel(), respMsg, new GenericFutureListenersuper Void>>() {
  47. @Override
  48. public void operationComplete(Future super Void> future) throws Exception {
  49. if (future.isSuccess()) {
  50. log.debug("server send handshake response");
  51. } else {
  52. log.error("server send handshake response fail...", future.cause());
  53. }
  54. }
  55. });
  56. }
  57. }
  58. }
  59. ctx.fireChannelRead(msg);
  60. }
  61. private XiaotMessage buildFailMessage(String body) {
  62. XiaotMessage respMsg = new XiaotMessage();
  63. XiaotHeader respHeader = new XiaotHeader();
  64. respHeader.setCommand(Command.HANDSHAKE_RESP.getVal());
  65. respHeader.setSuccess(Const.FAIL);
  66. respMsg.setHeader(respHeader);
  67. respMsg.setBody(body);
  68. log.error(body);
  69. return respMsg;
  70. }
  71. @Override
  72. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  73. ctx.fireExceptionCaught(cause);
  74. }
  75. }

心跳检查源码开发

客户端发起心跳请求HeartBeatReqHandler.java源码:

  1. @Slf4j
  2. public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {
  3. private volatile ScheduledFuture> scheduledFuture;
  4. @Override
  5. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  6. XiaotMessage receiveMsg = (XiaotMessage) msg;
  7. //参数检查
  8. if (receiveMsg != null && receiveMsg.getHeader() != null) {
  9. if (Command.HANDSHAKE_RESP.getVal() == receiveMsg.getHeader().getCommand()) {
  10. //接收到握手成功应答,初始化心跳检查后台任务
  11. if (Const.SUCCESS == receiveMsg.getHeader().getSuccess()) {
  12. log.debug("client init heartbeat task");
  13. scheduledFuture = ctx.executor().scheduleAtFixedRate(
  14. new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS
  15. );
  16. }
  17. }
  18. //接收到心跳应答,
  19. else if (Command.HEARTBEAT_RESP.getVal() == receiveMsg.getHeader().getCommand()) {
  20. log.debug("client receive heartbeat response");
  21. }
  22. }
  23. //放过
  24. ctx.fireChannelRead(msg);
  25. }
  26. @Override
  27. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  28. if (scheduledFuture != null) {
  29. scheduledFuture.cancel(false);
  30. ctx.executor().terminationFuture();
  31. }
  32. ctx.fireExceptionCaught(cause);
  33. }
  34. private static class HeartBeatTask implements Runnable {
  35. private final ChannelHandlerContext ctx;
  36. public HeartBeatTask(ChannelHandlerContext ctx) {
  37. this.ctx = ctx;
  38. }
  39. @Override
  40. public void run() {
  41. XiaotMessage sendMsg = new XiaotMessage();
  42. XiaotHeader header = new XiaotHeader();
  43. header.setCommand(Command.HEARTBEAT_REQ.getVal());
  44. sendMsg.setHeader(header);
  45. ChannelWriteUtil.write(ctx.channel(), sendMsg, future -> {
  46. if (future.isSuccess()) {
  47. log.debug("client send heartbeat request");
  48. } else {
  49. log.error("client send heartbeat request fail...", future.cause());
  50. }
  51. });
  52. }
  53. }
  54. }

服务端心跳应答HeartBeatRespHandler.java源码:

  1. @Slf4j
  2. public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. XiaotMessage receiveMsg = (XiaotMessage) msg;
  6. //参数检查
  7. if (receiveMsg != null && receiveMsg.getHeader() != null) {
  8. //接收到心跳请求,
  9. if (Command.HEARTBEAT_REQ.getVal() == receiveMsg.getHeader().getCommand()) {
  10. log.debug("server receive heartbeat request");
  11. //应答心跳请求
  12. XiaotMessage sendMsg = new XiaotMessage();
  13. XiaotHeader header = new XiaotHeader();
  14. header.setCommand(Command.HEARTBEAT_RESP.getVal());
  15. header.setSuccess(Const.SUCCESS);
  16. sendMsg.setHeader(header);
  17. ChannelWriteUtil.write(ctx.channel(), sendMsg, future -> {
  18. if (future.isSuccess()) {
  19. log.debug("server send heartbeat response");
  20. } else {
  21. log.error("server send heartbeat response fail..", future.cause());
  22. }
  23. });
  24. }
  25. }
  26. ctx.fireChannelRead(msg);
  27. }
  28. }

业务应答源码开发

  1. @Slf4j
  2. public class BizRespHandler extends ChannelInboundHandlerAdapter {
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. //类型转换
  6. XiaotMessage receiveMsg = (XiaotMessage) msg;
  7. //业务应答指令
  8. if (receiveMsg != null && receiveMsg.getHeader() != null && Command.BIZ_REQ.getVal() == receiveMsg.getHeader().getCommand()) {
  9. log.debug("server receive biz data : {}", JSONObject.toJSONString(receiveMsg));
  10. //基于JDK自带的SPI技术实现协议业务处理的自定义扩展开发
  11. //默认将接收到的业务请求回调XiaotBizRespCallbackProvide接口execute方法
  12. ServiceLoader loader = ServiceLoader.load(XiaotBizRespCallbackProvide.class);
  13. for (XiaotBizRespCallbackProvide service : loader) {
  14. service.execute(receiveMsg.getBody(), receiveMsg.getHeader().getAttribute(), ctx);
  15. }
  16. }
  17. ctx.fireChannelRead(msg);
  18. }
  19. }

末尾处理器源码开发

  1. @Slf4j
  2. public class TailHandler extends ChannelInboundHandlerAdapter {
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. //
  6. try {
  7. if (msg != null) {
  8. XiaotMessage message = (XiaotMessage) msg;
  9. if (message.getHeader() != null) {
  10. if (Command.of(message.getHeader().getCommand()) == null) {
  11. log.error("receive be not illegal command message: " + JSONObject.toJSONString(message));
  12. }
  13. }
  14. }
  15. } finally {
  16. ReferenceCountUtil.release(msg);
  17. }
  18. }
  19. @Override
  20. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  21. log.error(cause.getMessage(), cause);
  22. }
  23. }

建立连接源码开发

小T协议服务端源码:

  1. @Slf4j
  2. public class XiaotServer {
  3. /**
  4. * 绑定端口
  5. *
  6. * @throws Exception
  7. */
  8. public void bind(int port) throws Exception {
  9. //ReactSelect主线程组
  10. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  11. //工作线程组
  12. EventLoopGroup workGroup = new NioEventLoopGroup();
  13. ServerBootstrap bootstrap = new ServerBootstrap();
  14. bootstrap.group(bossGroup, workGroup)
  15. .channel(NioServerSocketChannel.class)
  16. // 当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
  17. // 如果未设置或所设置的值小于1,Java将使用默认值50。
  18. // 当长度大于Backlog时,新的连接就会被TCP内核拒绝掉
  19. .option(ChannelOption.SO_BACKLOG, 100)
  20. .childHandler(new ChannelInitializer() {
  21. @Override
  22. protected void initChannel(SocketChannel socketChannel) throws Exception {
  23. socketChannel.pipeline()
  24. //消息解码器,继承LengthFieldBasedFrameDecoder可以处理TCP粘包/拆包问题
  25. .addLast("XiaotMessageDecoder", new XiaotMessageDecoder(1024 * 1024, 0, 4))
  26. //消息编码器
  27. .addLast("XiaotMessageEncoder", new XiaotMessageEncoder())
  28. //握手应答处理器
  29. .addLast("HandshakeRespHandler", new HandshakeRespHandler())
  30. //心跳应答处理器
  31. .addLast("HeartBeatRespHandler", new HeartBeatRespHandler())
  32. //业务应答处理器
  33. .addLast("BizRespHandler", new BizRespHandler())
  34. //末尾处理器
  35. .addLast("TailHandler", new TailHandler())
  36. ;
  37. }
  38. });
  39. ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
  40. bootstrap.bind(port).sync();
  41. log.info("Xiaot Server Start on port: " + port);
  42. }
  43. public static void main(String[] args) throws Exception {
  44. new XiaotServer().bind(9000);
  45. }
  46. }

小T协议客户端源码:

  1. @Slf4j
  2. public class XiaotClient {
  3. /**
  4. * channel
  5. */
  6. private volatile Channel channel;
  7. private volatile boolean init = false;
  8. public XiaotClient(String host, int port) {
  9. //实例化客户端时启动异步线程向服务端请求连接
  10. new Thread(() -> connection(host, port)).start();
  11. }
  12. /**
  13. * 建立连接
  14. *
  15. * @param host 远程主机
  16. * @param port 远程端口
  17. */
  18. public synchronized void connection(String host, int port) {
  19. if (init) {
  20. return;
  21. }
  22. init = true;
  23. //
  24. EventLoopGroup group = new NioEventLoopGroup();
  25. Bootstrap bootstrap = new Bootstrap();
  26. try {
  27. bootstrap.group(group).channel(NioSocketChannel.class)
  28. //禁用nagle算法。
  29. .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
  30. //配置buffer水位线 1m
  31. // .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024))
  32. .handler(new ChannelInitializer() {
  33. @Override
  34. protected void initChannel(SocketChannel socketChannel) throws Exception {
  35. socketChannel.pipeline()
  36. //读超时,当超过30s未接收到服务端任何应答(包括心跳应答)时,将断开连接进行重试任务。
  37. .addLast(new ReadTimeoutHandler(30, TimeUnit.SECONDS))
  38. //消息解码器,继承自LengthFieldBasedFrameDecoder解决了TCP粘包和拆包问题
  39. //第一个参数表示消息最大大小,这里设置较大一个值。
  40. //第二个参数表示消息头的偏移位置,这里设置为0表示不偏移。
  41. //第三个参数表示消息长度占用的字节大小,这边设置为4,对应编码器的writeInt(xxx)
  42. .addLast("XiaotMessageDecoder", new XiaotMessageDecoder(1024 * 1024, 0, 4))
  43. //消息编码器
  44. .addLast("XiaotMessageEncoder", new XiaotMessageEncoder())
  45. //握手请求处理器
  46. .addLast("HandshakeReqHandler", new HandshakeReqHandler())
  47. //心跳请求处理器
  48. .addLast("HeartBeatReqHandler", new HeartBeatReqHandler())
  49. //业务应答处理器
  50. .addLast("BizRespHandler", new BizRespHandler())
  51. //末尾处理器
  52. .addLast("TailHandler", new TailHandler())
  53. ;
  54. }
  55. });
  56. //打印资源泄露日志
  57. ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
  58. ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)).sync();
  59. log.info("Xiaot Protocol Client Connection Server Success.");
  60. channel = future.channel();
  61. channel.closeFuture().sync();
  62. } catch (Exception e) {
  63. log.error(e.getMessage(), e);
  64. } finally {
  65. // 下面是客户端断开连接后的重连任务
  66. try {
  67. if (channel != null) {
  68. channel.close();
  69. channel = null;
  70. }
  71. TimeUnit.SECONDS.sleep(5);
  72. log.error("client connection time out, retry connection...");
  73. init = false;
  74. connection(host, port);
  75. } catch (InterruptedException interruptedException) {
  76. log.error("client connection time out, retry fail...", interruptedException);
  77. }
  78. }
  79. }
  80. /**
  81. * 发送消息
  82. *
  83. * @param body 消息体
  84. */
  85. public void sendMessage(Object body) throws Exception {
  86. this.sendMessage(body, null, null);
  87. }
  88. /**
  89. * 发送消息
  90. *
  91. * @param body 消息体
  92. */
  93. public void sendMessage(Object body, Map headerMap) throws Exception {
  94. this.sendMessage(body, headerMap, null);
  95. }
  96. /**
  97. * 发送消息
  98. *
  99. * @param body 消息体
  100. */
  101. public void sendMessage(Object body,
  102. Map headerMap,
  103. GenericFutureListener extends Future super Void>> futureListener) throws Exception {
  104. XiaotMessage message = new XiaotMessage();
  105. XiaotHeader header = new XiaotHeader();
  106. header.setCommand(Command.BIZ_REQ.getVal());
  107. if (headerMap != null) {
  108. header.setAttribute(headerMap);
  109. }
  110. message.setBody(body);
  111. message.setHeader(header);
  112. ChannelWriteUtil.write(channel, message, futureListener);
  113. }
  114. }

其他源码开发
ChannelWriteUtil.java源码:

  1. public class ChannelWriteUtil {
  2. public static ChannelFuture write(Channel channel,
  3. XiaotMessage message) {
  4. if (channel == null) {
  5. throw new NullPointerException("channel is null");
  6. }
  7. if (!channel.isWritable()) {
  8. throw new ChannelException("channel not writable");
  9. }
  10. ChannelFuture future = channel.writeAndFlush(message);
  11. if (!future.isSuccess()) {
  12. throw new ChannelException("channel write fail", future.cause());
  13. }
  14. return future;
  15. }
  16. public static ChannelFuture write(Channel channel,
  17. XiaotMessage message,
  18. GenericFutureListener extends Future super Void>> futureListener) {
  19. if (channel == null) {
  20. throw new NullPointerException("channel is null");
  21. }
  22. if (!channel.isWritable()) {
  23. throw new ChannelException("channel not writable");
  24. }
  25. ChannelFuture future = channel.writeAndFlush(message);
  26. if (future.cause() != null) {
  27. throw new ChannelException("channel write fail", future.cause());
  28. }
  29. if (futureListener != null) {
  30. future.addListener(futureListener);
  31. }
  32. return future;
  33. }
  34. }

Const.java源码:

  1. public class ChannelWriteUtil {
  2. public static ChannelFuture write(Channel channel,
  3. XiaotMessage message) {
  4. if (channel == null) {
  5. throw new NullPointerException("channel is null");
  6. }
  7. if (!channel.isWritable()) {
  8. throw new ChannelException("channel not writable");
  9. }
  10. ChannelFuture future = channel.writeAndFlush(message);
  11. if (!future.isSuccess()) {
  12. throw new ChannelException("channel write fail", future.cause());
  13. }
  14. return future;
  15. }
  16. public static ChannelFuture write(Channel channel,
  17. XiaotMessage message,
  18. GenericFutureListener extends Future super Void>> futureListener) {
  19. if (channel == null) {
  20. throw new NullPointerException("channel is null");
  21. }
  22. if (!channel.isWritable()) {
  23. throw new ChannelException("channel not writable");
  24. }
  25. ChannelFuture future = channel.writeAndFlush(message);
  26. if (future.cause() != null) {
  27. throw new ChannelException("channel write fail", future.cause());
  28. }
  29. if (futureListener != null) {
  30. future.addListener(futureListener);
  31. }
  32. return future;
  33. }
  34. }

小T协议试运行

启动客户端

模拟客户端启动源码如下:

  1. @Slf4j
  2. public class XiaotClientExample {
  3. public static void main(String[] args) throws InterruptedException {
  4. //实例化小T协议客户端
  5. XiaotClient client = new XiaotClient("localhost", 9000);
  6. //客户端启动三条线程同时通过小T协议客户端向服务端发送数据
  7. new Thread(new Runnable() {
  8. @Override
  9. public void run() {
  10. while (true) {
  11. try {
  12. TimeUnit.MILLISECONDS.sleep(1000);
  13. client.sendMessage("zhangsan sayHello33333 .................................", null, new GenericFutureListenersuper Void>>() {
  14. @Override
  15. public void operationComplete(Future super Void> future) throws Exception {
  16. if (future.isSuccess()) {
  17. // log.info("send success...");
  18. } else {
  19. log.error("send fail...", future.cause());
  20. }
  21. }
  22. });
  23. } catch (Exception e) {
  24. log.error(e.getMessage(), e);
  25. }
  26. }
  27. }
  28. }).start();
  29. new Thread(new Runnable() {
  30. @Override
  31. public void run() {
  32. while (true){
  33. try {
  34. TimeUnit.MILLISECONDS.sleep(7);
  35. client.sendMessage("wangwu sayHello11111 .................................", null);
  36. }catch (Exception e){
  37. log.error(e.getMessage(), e);
  38. }
  39. }
  40. }
  41. }).start();
  42. new Thread(new Runnable() {
  43. @Override
  44. public void run() {
  45. while (true){
  46. try {
  47. TimeUnit.MILLISECONDS.sleep(2);
  48. client.sendMessage("zhaoliu sayHello222222 .................................", null);
  49. }catch (Exception e){
  50. log.error(e.getMessage(), e);
  51. }
  52. }
  53. }
  54. }).start();
  55. //阻塞主线程
  56. TimeUnit.HOURS.sleep(10000);
  57. }
  58. }

客户端控制台日志输出如下:
 

 
由于服务端没启动,客户端请求连接失败,可以看到客户端channel为null导致不可写以及客户端定时发起重试的日志。

启动服务端

模拟启动服务端源码:

  1. public class XiaotServerExample {
  2. public static void main(String[] args) throws Exception {
  3. new XiaotServer().bind(9000);
  4. }
  5. }

服务端控制台日志输入如下:
 
 

 
可以看到服务端不断接收到客户端三条线程同时发送的业务请求,以及握手和心跳的请求。现在我们看到客户端控制台输出如下:
 
 

 
可以看到客户端定时发送和接收的心跳请求。此时如果关闭服务端,客户端将会断开连接,执行重连任务,截图如下:
 
 

 

 

arison
版权声明:本站原创文章,由arison2022-03-11发表,共计251976字。
转载提示:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)