From 9d61fa5dba4235d8ecfb31416e16013fc6284679 Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Wed, 27 Nov 2024 14:44:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8YKCDataProtocol=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jsowell/common/util/YKCUtils.java | 54 ++++++++++++++ .../yunkuaichong/NettyServerHandler.java | 74 ++++++++++++++++--- .../yunkuaichong/YKCBusinessService.java | 4 +- .../impl/YKCBusinessServiceImpl.java | 14 ++++ 4 files changed, 136 insertions(+), 10 deletions(-) diff --git a/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java b/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java index 529010614..f96ee1580 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java +++ b/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java @@ -84,6 +84,60 @@ public class YKCUtils { return false; } + public static boolean checkMsg(YKCDataProtocol ykcDataProtocol) { + // log.info("checkMsg:{}", BytesUtil.binary(msg, 16)); + // 起始标志 + byte[] head = ykcDataProtocol.getHead(); + // 数据长度 + byte[] length = ykcDataProtocol.getLength(); + // 序列号域 + byte[] serialNumber = ykcDataProtocol.getSerialNumber(); + // 加密标志 + byte[] encryptFlag = ykcDataProtocol.getEncryptFlag(); + // 帧类型标志 + byte[] frameType = ykcDataProtocol.getFrameType(); + // 消息体 + byte[] msgBody = ykcDataProtocol.getMsgBody(); + // 帧校验域 + byte[] crcByte = ykcDataProtocol.getCrcByte(); + + //起始位必须是0x68 + if (0x68 != head[0]) { + log.error("起始位必须是0x68"); + return false; + } + + // 如果是0x03帧,则不进行crc校验 + if (0x03 == frameType[0]) { + log.info("0x03帧,则不进行crc校验"); + return true; + } + + // 序列号域+加密标志+帧类型标志+消息体 + byte[] data = Bytes.concat(serialNumber, encryptFlag, frameType, msgBody); + // 校验长度 + if (data.length != BytesUtil.bytesToIntLittle(length)) { + log.error("数据长度不正确, 数据长度:{}, 实际长度:{}", BytesUtil.bytesToIntLittle(length), data.length); + return false; + } + // CRC校验 source target + String sourceCRC = String.format("%04x", BytesUtil.bytesToInt(crcByte, 0)); + String targetCRC = String.format("%04x", CRC16Util.calcCrc16(data)); + String oldTargetCRC = String.format("%04x", CRC16Util.calcCrc16Old(data)); + // 将高低位位置反转,得出新的crc + String lowString = StringUtils.substring(targetCRC, 0, 2); + String highString = StringUtils.substring(targetCRC, 2, 4); + String crc = highString + lowString; + // 若目标crc和高低位反转前/后的crc都不一致,则校验不通过 + if (sourceCRC.equalsIgnoreCase(targetCRC) || sourceCRC.equalsIgnoreCase(crc)) { + return true; + } + log.error("CRC校验不通过, 源crc:{}, 计算得出crc:{}, 老crc计算:{}, 高低位反转后crc:{}, 帧类型:{}, 帧名称:{}, 报文:{}" + , sourceCRC, targetCRC, oldTargetCRC, crc, YKCUtils.frameType2Str(frameType), + YKCFrameTypeCode.getFrameTypeStr(YKCUtils.frameType2Str(frameType)), BytesUtil.binary(ykcDataProtocol.getBytes(), 16)); + return false; + } + /** * 获取结果报文 * @param ykcDataProtocol diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java index 10383c7ab..565f231ce 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java @@ -67,16 +67,72 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 有客户端发消息会触发此函数 */ + // @Override + // public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { + // // log.info("channelRead-aClass:{}", message.getClass()); + // // log.info("加载客户端报文channelRead=== channelId:" + ctx.channel().id() + ", msg:" + message); + // // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 + // YKCDataProtocol ykcDataProtocol = (YKCDataProtocol) message; + // byte[] msg = ykcDataProtocol.getBytes(); + // + // // 获取帧类型 + // byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1); + // String frameType = YKCUtils.frameType2Str(frameTypeBytes); + // + // // 判断该帧类型是否为某请求帧的应答帧 + // String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType); + // // log.info("同步获取响应数据-判断该帧类型是否为某请求帧的应答帧, frameType:{}, requestFrameType:{}", frameType, requestFrameType); + // if (StringUtils.isNotBlank(requestFrameType)) { + // // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + // String msgId = ctx.channel().id().toString() + "_" + requestFrameType; + // // log.info("同步获取响应数据-收到消息, msgId:{}", msgId); + // SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); + // if(syncPromise != null) { + // // 设置响应结果 + // syncPromise.setRpcResult(msg); + // // 唤醒外部线程 + // // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); + // syncPromise.wake(); + // } + // } + // + // // 获取序列号域 + // int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2)); + // + // // 获取channel + // Channel channel = ctx.channel(); + // + // // 心跳包0x03日志太多,造成日志文件过大,改为不打印 + // if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", + // channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + // BytesUtil.binary(msg, 16)); + // } + // + // // 处理数据 + // byte[] response = ykcService.process(msg, ctx); + // if (Objects.nonNull(response)) { + // // 响应客户端 + // ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); + // this.channelWrite(channel.id(), buffer); + // if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // // 应答帧类型 + // byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes); + // String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes); + // log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", + // channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), + // frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + // BytesUtil.binary(response, 16)); + // } + // } + // } + @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { - // log.info("channelRead-aClass:{}", message.getClass()); - // log.info("加载客户端报文channelRead=== channelId:" + ctx.channel().id() + ", msg:" + message); - // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 YKCDataProtocol ykcDataProtocol = (YKCDataProtocol) message; - byte[] msg = ykcDataProtocol.getBytes(); // 获取帧类型 - byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1); + byte[] frameTypeBytes = ykcDataProtocol.getFrameType(); String frameType = YKCUtils.frameType2Str(frameTypeBytes); // 判断该帧类型是否为某请求帧的应答帧 @@ -89,7 +145,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); if(syncPromise != null) { // 设置响应结果 - syncPromise.setRpcResult(msg); + syncPromise.setRpcResult(ykcDataProtocol.getBytes()); // 唤醒外部线程 // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); syncPromise.wake(); @@ -97,7 +153,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } // 获取序列号域 - int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2)); + int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber()); // 获取channel Channel channel = ctx.channel(); @@ -106,11 +162,11 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, - BytesUtil.binary(msg, 16)); + BytesUtil.binary(ykcDataProtocol.getBytes(), 16)); } // 处理数据 - byte[] response = ykcService.process(msg, ctx); + byte[] response = ykcService.process(ykcDataProtocol, ctx); if (Objects.nonNull(response)) { // 响应客户端 ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/YKCBusinessService.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/YKCBusinessService.java index 102ac12fc..1bf85a02b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/YKCBusinessService.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/YKCBusinessService.java @@ -1,6 +1,6 @@ package com.jsowell.netty.service.yunkuaichong; -import io.netty.channel.Channel; +import com.jsowell.common.core.domain.ykc.YKCDataProtocol; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; @@ -18,6 +18,8 @@ public interface YKCBusinessService { */ byte[] process(byte[] msg, ChannelHandlerContext ctx); + byte[] process(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx); + /** * 桩退出 * @param channelId channelId diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java index 641edecae..5d088c382 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java @@ -50,6 +50,20 @@ public class YKCBusinessServiceImpl implements YKCBusinessService { return invokeStrategy.supplyProcess(ykcDataProtocol, ctx); } + @Override + public byte[] process(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) { + if (!YKCUtils.checkMsg(ykcDataProtocol)) { + // 校验不通过,丢弃消息 + return null; + } + // 获取帧类型 + String frameType = YKCUtils.frameType2Str(ykcDataProtocol.getFrameType()); + // 获取业务处理handler + AbstractYkcHandler invokeStrategy = YKCOperateFactory.getInvokeStrategy(frameType); + // AbstractYkcHandlerV2 invokeStrategy = ykcOperateFactoryV2.getInvokeStrategy(frameType); + return invokeStrategy.supplyProcess(ykcDataProtocol, ctx); + } + @Override public void exit(ChannelId channelId) { // 获取桩编号