From 2ff36a6ed7610eeb6b5f83c255c8c625ebab27f2 Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Sat, 28 Dec 2024 11:35:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=8B=E5=8A=A8=E9=87=8A=E6=94=BE=E5=AF=B9?= =?UTF-8?q?=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yunkuaichong/NettyServerHandler.java | 81 ++++++++----------- 1 file changed, 32 insertions(+), 49 deletions(-) diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java index f6637a282..18689933a 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java @@ -13,6 +13,7 @@ import io.netty.channel.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -127,57 +128,39 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { - YKCDataProtocol ykcDataProtocol = (YKCDataProtocol) message; - - // 获取帧类型 - byte[] frameTypeBytes = ykcDataProtocol.getFrameType(); - String frameType = YKCUtils.frameType2Str(frameTypeBytes); - - // 判断该帧类型是否为某请求帧的应答帧 - // String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType); - // log.info("同步获取响应数据-判断该帧类型是否为某请求帧的应答帧, frameType:{}, requestFrameType:{}", frameType, requestFrameType); - // if (StringUtils.isNotBlank(requestFrameType)) { - // // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 - // String msgId = ctx.channel().id().toString() + "_" + requestFrameType; - // // log.info("同步获取响应数据-收到消息, msgId:{}", msgId); - // SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); - // if(syncPromise != null) { - // // 设置响应结果 - // syncPromise.setRpcResult(ykcDataProtocol.getBytes()); - // // 唤醒外部线程 - // // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); - // syncPromise.wake(); - // } - // } - - // 获取序列号域 - int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber()); - - // 获取channel - Channel channel = ctx.channel(); - - // 心跳包0x03日志太多,造成日志文件过大,改为不打印 - if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { - log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", - channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, - BytesUtil.binary(ykcDataProtocol.getBytes(), 16)); - } - - // 处理数据 - byte[] response = ykcService.process(ykcDataProtocol, ctx); - if (Objects.nonNull(response)) { - // 响应客户端 - ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); - this.channelWrite(channel.id(), buffer); + try { + YKCDataProtocol ykcDataProtocol = (YKCDataProtocol) message; + // 获取帧类型 + byte[] frameTypeBytes = ykcDataProtocol.getFrameType(); + String frameType = YKCUtils.frameType2Str(frameTypeBytes); + // 获取序列号域 + int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber()); + // 获取channel + Channel channel = ctx.channel(); + // 心跳包0x03日志太多,造成日志文件过大,改为不打印 if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { - // 应答帧类型 - byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes); - String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes); - log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", - channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), - frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, - BytesUtil.binary(response, 16)); + log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", + channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + BytesUtil.binary(ykcDataProtocol.getBytes(), 16)); } + // 处理数据 + byte[] response = ykcService.process(ykcDataProtocol, ctx); + if (Objects.nonNull(response)) { + // 响应客户端 + ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); + this.channelWrite(channel.id(), buffer); + if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // 应答帧类型 + byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes); + String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes); + log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", + channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), + frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + BytesUtil.binary(response, 16)); + } + } + } finally { + ReferenceCountUtil.release(message); } }