From e85e630282d522bfe19ab66951c2a50e519e6dfa Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Sat, 14 Sep 2024 16:07:59 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E7=94=B5=E5=8D=95=E8=BD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jsowell/common/util/YKCUtils.java | 11 ++- .../impl/EBikeBusinessServiceImpl.java | 15 ++++ .../impl/EBikeSendCommandServiceImpl.java | 17 +++-- .../impl/YKCPushCommandServiceImpl.java | 72 ------------------- 4 files changed, 33 insertions(+), 82 deletions(-) diff --git a/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java b/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java index 28e63161b..4d39c869e 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java +++ b/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java @@ -124,7 +124,16 @@ public class YKCUtils { */ public static String frameType2Str(byte[] bytes) { String s = BytesUtil.bin2HexStr(bytes); - return Constants.HEX_PREFIX + s; + return frameType2Str(s); + } + + /** + * 如:"01"--> "0x01" + * @param frameType 帧类型 + * @return + */ + public static String frameType2Str(String frameType) { + return Constants.HEX_PREFIX + frameType; } public static byte[] frameTypeStr2Bytes(String frameTypeStr) { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/EBikeBusinessServiceImpl.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/EBikeBusinessServiceImpl.java index 690666cd4..2171ac117 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/EBikeBusinessServiceImpl.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/EBikeBusinessServiceImpl.java @@ -4,7 +4,9 @@ import com.jsowell.common.core.domain.ebike.EBikeDataProtocol; import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; import com.jsowell.common.enums.ykc.PileChannelEntity; import com.jsowell.common.enums.ykc.PileConnectorDataBaseStatusEnum; +import com.jsowell.common.protocol.SyncPromise; import com.jsowell.common.util.BytesUtil; +import com.jsowell.common.util.RpcUtil; import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.YKCUtils; import com.jsowell.netty.factory.EBikeOperateFactory; @@ -40,6 +42,19 @@ public class EBikeBusinessServiceImpl implements EBikeBusinessService { String command = YKCUtils.frameType2Str(eBikeDataProtocol.getCommand()); log.info("电单车===>收到消息, channelId:{}, 指令:{}, 指令名称:{}, msg:{}", ctx.channel().id().toString(), command, EBikeCommandEnum.getDescByCode(BytesUtil.bytesToIntLittle(eBikeDataProtocol.getCommand())), BytesUtil.binary(msg, 16)); + + // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + String msgId = ctx.channel().id().toString() + "_" + command; + log.info("同步获取响应数据-收到消息, msgId:{}", msgId); + SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); + if(syncPromise != null) { + // 设置响应结果 + syncPromise.setRpcResult(msg); + // 唤醒外部线程 + // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); + syncPromise.wake(); + } + // 获取业务处理handler AbstractEBikeHandler invokeStrategy = EBikeOperateFactory.getInvokeStrategy(command); if (invokeStrategy != null) { diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/EBikeSendCommandServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/EBikeSendCommandServiceImpl.java index dfc361f4f..9aa50d5b3 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/EBikeSendCommandServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/EBikeSendCommandServiceImpl.java @@ -2,10 +2,7 @@ package com.jsowell.pile.service.impl; import com.jsowell.common.enums.ykc.PileChannelEntity; import com.jsowell.common.protocol.SyncPromise; -import com.jsowell.common.util.BytesUtil; -import com.jsowell.common.util.RandomUtil; -import com.jsowell.common.util.RpcUtil; -import com.jsowell.common.util.StringUtils; +import com.jsowell.common.util.*; import com.jsowell.common.util.id.IdUtils; import com.jsowell.pile.domain.ebike.AbsEBikeMessage; import com.jsowell.pile.domain.ebike.serversend.EBikeMessageCmd82; @@ -141,6 +138,8 @@ public class EBikeSendCommandServiceImpl implements EBikeSendCommandService { private byte[] send(AbsEBikeMessage msg, long timeout, TimeUnit unit) throws Exception { String pileSn = msg.getPhysicalId() + ""; byte[] messageBytes = msg.getMessageBytes(); + String command = YKCUtils.frameType2Str(msg.getCommand()); + // PileChannelEntity.output(); log.info("发送电单车send命令, pileSn:{}, messageBytes:{}", pileSn, BytesUtil.binary(messageBytes, 16)); // 获取桩的channel @@ -153,7 +152,7 @@ public class EBikeSendCommandServiceImpl implements EBikeSendCommandService { // 创造一个容器,用于存放当前线程与rpcClient中的线程交互 SyncPromise syncPromise = new SyncPromise(); // 消息id = channelId + 帧类型(例如: "0x34") - String msgId = ctx.channel().id().toString() + "_" + msg.getCommand(); + String msgId = ctx.channel().id().toString() + "_" + command; log.info("同步获取响应数据-发送消息, msgId:{}", msgId); RpcUtil.getSyncPromiseMap().put(msgId, syncPromise); @@ -163,12 +162,12 @@ public class EBikeSendCommandServiceImpl implements EBikeSendCommandService { // 检查操作的状态 if (channelFutureListener.isSuccess()) { log.info("【电单车send结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", - pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), msg.getCommand(), BytesUtil.binary(messageBytes, 16)); + pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), command, BytesUtil.binary(messageBytes, 16)); } else { // 如果发生错误,则访问描述原因的Throwable Throwable cause = channelFutureListener.cause(); log.info("【电单车send结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", - pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), msg.getCommand(), BytesUtil.binary(messageBytes, 16)); + pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), command, BytesUtil.binary(messageBytes, 16)); log.error("电单车send发送命令失败, pileSn:{}", pileSn, cause); } }); @@ -178,9 +177,9 @@ public class EBikeSendCommandServiceImpl implements EBikeSendCommandService { if(rpcResponse == null) { if(syncPromise.isTimeout()) { // throw new TimeoutException("等待响应结果超时"); - log.error("发送[{}]指令后, 等待响应结果超时", msg.getCommand()); + log.error("发送[{}]指令后, 等待响应结果超时", command); } else{ - log.error("发送[{}]指令后, 发生其他异常", msg.getCommand()); + log.error("发送[{}]指令后, 发生其他异常", command); } } // 移除容器 diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java index 80870266a..f432fd4c1 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java @@ -44,9 +44,6 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { @Autowired private PileModelInfoService pileModelInfoService; - @Autowired - private PileBasicInfoService pileBasicInfoService; - @Autowired private PileStationInfoService pileStationInfoService; @@ -64,75 +61,6 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getBytes()) ); - /** - * 发送push消息 - * - * @param msg - * @param pileSn - * @param frameTypeCode - * @return - */ - // public boolean push(byte[] msg, String pileSn, Enum frameTypeCode) { - // // 通过桩编号获取channel - // ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn); - // String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); - // if (Objects.isNull(ctx)) { - // log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn); - // return false; - // } - // /* - // 拼接报文 - // */ - // // 起始标志 - // byte[] head = new byte[]{0x68}; - // - // // 序列号域 - // byte[] serialNumber = new byte[]{0x00, 0x00}; - // - // // 加密标志 - // byte[] encryptFlag = new byte[]{0x00}; - // - // // 帧类型标志 - // byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()}; - // - // // 序列号域+加密标志+帧类型标志+消息体 - // byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg); - // - // // 数据长度 - // byte[] length = BytesUtil.intToBytes(temp.length, 1); - // - // // 帧校验域 - // byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp)); - // - // // 返回报文 - // byte[] writeMsg = Bytes.concat(head, length, temp, crc); - // - // // 返回完整的报文 string类型 - // String wholeMsg = BytesUtil.binary(writeMsg, 16); - // ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg); - // ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf); - // channelFuture.addListener((ChannelFutureListener) channelFutureListener -> { - // // 检查操作的状态 - // if (channelFutureListener.isSuccess()) { - // log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", - // pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); - // } else { - // // 如果发生错误,则访问描述原因的Throwable - // Throwable cause = channelFutureListener.cause(); - // log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", - // pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); - // log.error("push发送命令失败, pileSn:{}", pileSn, cause); - // } - // }); - // - // // 保存报文 - // String frameTypeStr = YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes()); - // if (frameTypeList.contains(frameTypeStr)) { - // pileMsgRecordService.save(pileSn, null, frameTypeStr, null, wholeMsg); - // } - // return true; - // } - public byte[] send(byte[] msg, String pileSn, Enum frameTypeCode) throws Exception { return this.send(msg, pileSn, frameTypeCode, 5, TimeUnit.SECONDS); }