From 0e890d9946cfa4492070f0fcf86b955a46c4d539 Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Thu, 28 Nov 2024 08:57:58 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A2=84=E7=BA=A6=E6=8C=87=E4=BB=A4=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E8=BF=94=E5=9B=9E=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ReservationChargingResponseHandler.java | 18 +++++++++-- .../yunkuaichong/NettyServerHandler.java | 30 +++++++++---------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java index 53ff0db12..0efc676d7 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java @@ -5,7 +5,9 @@ import com.jsowell.common.constant.CacheConstants; import com.jsowell.common.core.domain.ykc.YKCDataProtocol; import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; import com.jsowell.common.core.redis.RedisCache; +import com.jsowell.common.protocol.SyncPromise; import com.jsowell.common.util.BytesUtil; +import com.jsowell.common.util.RpcUtil; import com.jsowell.common.util.YKCUtils; import com.jsowell.netty.factory.YKCOperateFactory; import com.jsowell.pile.domain.PileReservationInfo; @@ -36,7 +38,7 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler { } @Override - public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) { + public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) { // log.info("[====远程更新应答====] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString()); // 消息体 byte[] msgBody = ykcDataProtocol.getMsgBody(); @@ -55,7 +57,7 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler { String pileSn = BytesUtil.bcd2Str(pileSnByteArr); // 保存时间 - saveLastTimeAndCheckChannel(pileSn, channel); + saveLastTimeAndCheckChannel(pileSn, ctx); // 枪口号 startIndex += length; @@ -78,6 +80,18 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler { log.info("0x59预约充电响应, 交易流水号:{}, 桩SN:{}, 枪口号:{}, 结果:{}, 失败原因:{}", transactionCode, pileSn, connectorCode, resultCode, failedReason); + // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + String msgId = ctx.channel().id().toString() + "_" + YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getCode(); + log.info("同步获取响应数据-收到消息, msgId:{}", msgId); + SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); + if(syncPromise != null) { + // 设置响应结果 + syncPromise.setRpcResult(ykcDataProtocol.getBytes()); + // 唤醒外部线程 + log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); + syncPromise.wake(); + } + // 如果收到成功, 从redis取值, 保存到数据库 if ("01".equals(resultCode)) { // 预约成功, 删除redis中的预约信息 diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java index 565f231ce..35e0279a3 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java @@ -7,8 +7,6 @@ import com.jsowell.common.enums.ykc.PileChannelEntity; import com.jsowell.common.util.BytesUtil; import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.YKCUtils; -import com.jsowell.common.util.RpcUtil; -import com.jsowell.common.protocol.SyncPromise; import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; import io.netty.buffer.ByteBuf; import io.netty.channel.*; @@ -136,21 +134,21 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { String frameType = YKCUtils.frameType2Str(frameTypeBytes); // 判断该帧类型是否为某请求帧的应答帧 - String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType); + // String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType); // log.info("同步获取响应数据-判断该帧类型是否为某请求帧的应答帧, frameType:{}, requestFrameType:{}", frameType, requestFrameType); - if (StringUtils.isNotBlank(requestFrameType)) { - // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 - String msgId = ctx.channel().id().toString() + "_" + requestFrameType; - // log.info("同步获取响应数据-收到消息, msgId:{}", msgId); - SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); - if(syncPromise != null) { - // 设置响应结果 - syncPromise.setRpcResult(ykcDataProtocol.getBytes()); - // 唤醒外部线程 - // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); - syncPromise.wake(); - } - } + // if (StringUtils.isNotBlank(requestFrameType)) { + // // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + // String msgId = ctx.channel().id().toString() + "_" + requestFrameType; + // // log.info("同步获取响应数据-收到消息, msgId:{}", msgId); + // SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); + // if(syncPromise != null) { + // // 设置响应结果 + // syncPromise.setRpcResult(ykcDataProtocol.getBytes()); + // // 唤醒外部线程 + // // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); + // syncPromise.wake(); + // } + // } // 获取序列号域 int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber());