diff --git a/jsowell-admin/src/main/java/com/jsowell/web/controller/pile/PileRemoteController.java b/jsowell-admin/src/main/java/com/jsowell/web/controller/pile/PileRemoteController.java index ce69017b0..3a4c5d446 100644 --- a/jsowell-admin/src/main/java/com/jsowell/web/controller/pile/PileRemoteController.java +++ b/jsowell-admin/src/main/java/com/jsowell/web/controller/pile/PileRemoteController.java @@ -123,6 +123,16 @@ public class PileRemoteController extends BaseController { return AjaxResult.success(); } + /** + * 对时 + * http://localhost:8080/pile/remote/proofreadTimeTest + */ + @PostMapping("/proofreadTimeTest") + public AjaxResult proofreadTimeTest(@RequestBody QueryPileDTO queryPileDTO) { + pileRemoteService.proofreadTimeTest(queryPileDTO.getPileSn()); + return AjaxResult.success(); + } + /** * 远程升级 * http://localhost:8080/pile/remote/updateFirmware diff --git a/jsowell-common/src/main/java/com/jsowell/common/core/domain/ykc/YKCFrameTypeCode.java b/jsowell-common/src/main/java/com/jsowell/common/core/domain/ykc/YKCFrameTypeCode.java index b9e2e1920..a1f990264 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/core/domain/ykc/YKCFrameTypeCode.java +++ b/jsowell-common/src/main/java/com/jsowell/common/core/domain/ykc/YKCFrameTypeCode.java @@ -114,8 +114,8 @@ public enum YKCFrameTypeCode { this.value = value; } - private int code; - private String value; + private int code; // 帧类型code + private String value; // 帧类型名称 public int getCode() { return code; @@ -129,15 +129,6 @@ public enum YKCFrameTypeCode { return BytesUtil.intToBytesLittle(code, 1); } - public static YKCFrameTypeCode fromCode(byte code) { - for (YKCFrameTypeCode item : YKCFrameTypeCode.values()) { - if (item.getCode() == code) { - return item; - } - } - return null; - } - public static String getFrameTypeStr(String frameType) { for (YKCFrameTypeCode item : YKCFrameTypeCode.values()) { String str = YKCUtils.frameType2Str(item.getBytes()); @@ -281,6 +272,7 @@ public enum YKCFrameTypeCode { REMOTE_ACCOUNT_BALANCE_UPDATE(REMOTE_ACCOUNT_BALANCE_UPDATE_CODE.getCode(), REMOTE_ACCOUNT_BALANCE_UPDATE_ANSWER_CODE.getCode()), ; + // 请求帧类型 private int requestFrameType; @@ -307,36 +299,81 @@ public enum YKCFrameTypeCode { return BytesUtil.intToBytesLittle(requestFrameType, 1); } + public byte[] getResponseFrameBytes() { + return BytesUtil.intToBytesLittle(responseFrameType, 1); + } + PileAnswersRelation(int requestFrameType, int responseFrameType) { this.requestFrameType = requestFrameType; this.responseFrameType = responseFrameType; } // 根据请求帧类型 获取应答帧类型 int类型 - public static int getResponseFrameTypeByRequestFrameType(int requestFrameType) { - for (PileAnswersRelation relation : PileAnswersRelation.values()) { - if (relation.getRequestFrameType() == requestFrameType) { - return relation.getResponseFrameType(); - } - } - return 0; - } + // public static int getResponseFrameTypeByRequestFrameType(int requestFrameType) { + // for (PileAnswersRelation relation : PileAnswersRelation.values()) { + // if (relation.getRequestFrameType() == requestFrameType) { + // return relation.getResponseFrameType(); + // } + // } + // return 0; + // } // 根据请求帧类型 获取应答帧类型 byte[]类型 - public static byte[] getResponseFrameTypeBytes(byte[] requestFrameType) { - int frameType = BytesUtil.bytesToInt(requestFrameType); - return BytesUtil.intToBytes(getResponseFrameTypeByRequestFrameType(frameType), 1); + // public static byte[] getResponseFrameTypeBytes(byte[] requestFrameType) { + // int frameType = BytesUtil.bytesToInt(requestFrameType); + // return BytesUtil.intToBytes(getResponseFrameTypeByRequestFrameType(frameType), 1); + // } + + /** + * 根据响应帧类型 查找对应的 请求帧类型 + */ + public static String getRequestFrameType(String responseFrameType) { + for (PileAnswersRelation relation : PileAnswersRelation.values()) { + if (StringUtils.equals(responseFrameType, YKCUtils.frameType2Str(relation.getResponseFrameBytes()))) { + return YKCUtils.frameType2Str(relation.getRequestFrameBytes()); + } + } + return null; } - // 需要获取应答的帧类型 - public static List getRequestFrameTypeList() { - List resultList = Lists.newArrayList(); - for (PileAnswersRelation relation : PileAnswersRelation.values()) { - resultList.add(YKCUtils.frameType2Str(relation.getRequestFrameBytes())); - } - return resultList; + // 请求帧类型集合 需要获取应答的帧类型 + // public static List getRequestFrameTypeList() { + // List requestList = Lists.newArrayList(); + // for (PileAnswersRelation relation : PileAnswersRelation.values()) { + // requestList.add(YKCUtils.frameType2Str(relation.getRequestFrameBytes())); + // } + // return requestList; + // } + + // 根据应答帧类型获取请求帧类型 byte[]类型 + // public static byte[] getRequestFrameTypeBytes(byte[] responseFrameType) { + // int frameType = BytesUtil.bytesToInt(responseFrameType); + // return BytesUtil.intToBytes(getRequestFrameTypeByResponseFrameType(frameType), 1); + // } + + public static void main(String[] args) { + System.out.println(BytesUtil.intToBytes(0)); } + // 根据应答帧类型获取请求帧类型 int类型 + // public static int getRequestFrameTypeByResponseFrameType(int requestFrameType) { + // for (PileAnswersRelation relation : PileAnswersRelation.values()) { + // if (relation.getRequestFrameType() == requestFrameType) { + // return relation.getResponseFrameType(); + // } + // } + // return 0; + // } + + // 应答帧类型集合 + // public static List getResponseFrameTypeList() { + // List responseList = Lists.newArrayList(); + // for (PileAnswersRelation relation : PileAnswersRelation.values()) { + // responseList.add(YKCUtils.frameType2Str(relation.getResponseFrameBytes())); + // } + // return responseList; + // } + } } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/rpc/RpcResponseHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/rpc/RpcResponseHandler.java index d98837b8a..22cbd4361 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/rpc/RpcResponseHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/rpc/RpcResponseHandler.java @@ -1,5 +1,6 @@ package com.jsowell.netty.rpc; +import com.jsowell.pile.rpc.SyncPromise; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/rpc/RpcUtil.java b/jsowell-netty/src/main/java/com/jsowell/netty/rpc/RpcUtil.java index cc85f8581..9a2d15af7 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/rpc/RpcUtil.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/rpc/RpcUtil.java @@ -1,12 +1,30 @@ package com.jsowell.netty.rpc; +import com.google.common.collect.Lists; +import com.google.common.primitives.Bytes; +import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; +import com.jsowell.common.enums.ykc.PileChannelEntity; +import com.jsowell.common.util.BytesUtil; +import com.jsowell.common.util.CRC16Util; +import com.jsowell.common.util.YKCUtils; +import com.jsowell.pile.rpc.SyncPromise; +import com.jsowell.pile.service.PileMsgRecordService; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +@Slf4j public class RpcUtil { private final static Map syncPromiseMap = new ConcurrentHashMap<>(); @@ -16,31 +34,128 @@ public class RpcUtil { static{ channel = new RpcClient().connect("127.0.0.1", 8888); } + + @Autowired + private PileMsgRecordService pileMsgRecordService; + + // 需要记录报文的数据帧类型 + private final static List frameTypeList = Lists.newArrayList( + YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_RESTART_CODE.getBytes()), + YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE.getBytes()), + YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE.getBytes()), + YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getBytes()) + ); public static RpcResponse send(RpcRequest rpcRequest, long timeout, TimeUnit unit) throws Exception{ - if(channel == null) { + // if(channel == null) { + // throw new NullPointerException("channel"); + // } + // + // if(rpcRequest == null) { + // throw new NullPointerException("rpcRequest"); + // } + // + // if(timeout <= 0) { + // throw new IllegalArgumentException("timeout must greater than 0"); + // } + // + // // 创造一个容器,用于存放当前线程与rpcClient中的线程交互 + // SyncPromise syncPromise = new SyncPromise(); + // syncPromiseMap.put(rpcRequest.getId(), syncPromise); + // + // // 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果 + // channel.writeAndFlush(rpcRequest); + // + // // 等待获取结果 + // RpcResponse rpcResponse = syncPromise.get2(timeout, unit); + // + // if(rpcResponse == null) { + // if(syncPromise.isTimeout()) { + // throw new TimeoutException("等待响应结果超时"); + // } else{ + // throw new Exception("其他异常"); + // } + // } + // + // // 移除容器 + // syncPromiseMap.remove(rpcRequest.getId()); + + return null; + } + + public static byte[] send2(byte[] msg, String pileSn, Enum frameTypeCode, long timeout, TimeUnit unit) throws Exception{ + // 通过桩编号获取channel + ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn); + String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); // 帧类型名称 + if (Objects.isNull(ctx)) { + log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn); throw new NullPointerException("channel"); } - - if(rpcRequest == null) { - throw new NullPointerException("rpcRequest"); + if(msg == null) { + throw new NullPointerException("msg"); } - if(timeout <= 0) { throw new IllegalArgumentException("timeout must greater than 0"); } - + // 创造一个容器,用于存放当前线程与rpcClient中的线程交互 SyncPromise syncPromise = new SyncPromise(); - syncPromiseMap.put(rpcRequest.getId(), syncPromise); - + + // 消息id = channelId + 帧类型(例如: "0x34") + String msgId = ctx.channel().id().toString() + "_" + YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes()); + + syncPromiseMap.put(msgId, syncPromise); + // 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果 - channel.writeAndFlush(rpcRequest); - + /* + 拼接报文 + */ + // 起始标志 + byte[] head = new byte[]{0x68}; + + // 序列号域 + byte[] serialNumber = new byte[]{0x00, 0x00}; + + // 加密标志 + byte[] encryptFlag = new byte[]{0x00}; + + // 帧类型标志 + byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()}; + + // 序列号域+加密标志+帧类型标志+消息体 + byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg); + + // 数据长度 + byte[] length = BytesUtil.intToBytes(temp.length, 1); + + // 帧校验域 + byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp)); + + // 返回报文 + byte[] writeMsg = Bytes.concat(head, length, temp, crc); + + // 返回完整的报文 string类型 + String wholeMsg = BytesUtil.binary(writeMsg, 16); + ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg); + ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf); + channelFuture.addListener((ChannelFutureListener) channelFutureListener -> { + // 检查操作的状态 + if (channelFutureListener.isSuccess()) { + log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", + pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); + } else { + // 如果发生错误,则访问描述原因的Throwable + Throwable cause = channelFutureListener.cause(); + log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", + pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); + log.error("push发送命令失败, pileSn:{}", pileSn, cause); + } + }); + // 等待获取结果 - RpcResponse rpcResponse = syncPromise.get(timeout, unit); - + byte[] rpcResponse = syncPromise.get2(timeout, unit); + if(rpcResponse == null) { if(syncPromise.isTimeout()) { throw new TimeoutException("等待响应结果超时"); @@ -48,12 +163,73 @@ public class RpcUtil { throw new Exception("其他异常"); } } - + // 移除容器 - syncPromiseMap.remove(rpcRequest.getId()); - + syncPromiseMap.remove(msgId); + return rpcResponse; } + + public boolean push(byte[] msg, String pileSn, Enum frameTypeCode) { + // 通过桩编号获取channel + ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn); + String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); + if (Objects.isNull(ctx)) { + log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn); + return false; + } + /* + 拼接报文 + */ + // 起始标志 + byte[] head = new byte[]{0x68}; + + // 序列号域 + byte[] serialNumber = new byte[]{0x00, 0x00}; + + // 加密标志 + byte[] encryptFlag = new byte[]{0x00}; + + // 帧类型标志 + byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()}; + + // 序列号域+加密标志+帧类型标志+消息体 + byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg); + + // 数据长度 + byte[] length = BytesUtil.intToBytes(temp.length, 1); + + // 帧校验域 + byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp)); + + // 返回报文 + byte[] writeMsg = Bytes.concat(head, length, temp, crc); + + // 返回完整的报文 string类型 + String wholeMsg = BytesUtil.binary(writeMsg, 16); + ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg); + ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf); + channelFuture.addListener((ChannelFutureListener) channelFutureListener -> { + // 检查操作的状态 + if (channelFutureListener.isSuccess()) { + log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", + pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); + } else { + // 如果发生错误,则访问描述原因的Throwable + Throwable cause = channelFutureListener.cause(); + log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", + pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); + log.error("push发送命令失败, pileSn:{}", pileSn, cause); + } + }); + + // 保存报文 + String frameTypeStr = YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes()); + if (frameTypeList.contains(frameTypeStr)) { + pileMsgRecordService.save(pileSn, null, frameTypeStr, null, wholeMsg); + } + return true; + } public static Map getSyncPromiseMap(){ return syncPromiseMap; diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/rpc/SyncPromise.java b/jsowell-netty/src/main/java/com/jsowell/netty/rpc/SyncPromise.java deleted file mode 100644 index 41ef7b322..000000000 --- a/jsowell-netty/src/main/java/com/jsowell/netty/rpc/SyncPromise.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.jsowell.netty.rpc; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class SyncPromise { - - // 用于接收结果 - private RpcResponse rpcResponse; - - private final CountDownLatch countDownLatch = new CountDownLatch(1); - - // 用于判断是否超时 - private boolean isTimeout = false; - - /** - * 同步等待返回结果 - */ - public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException { - // 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断 - boolean earlyWakeUp = countDownLatch.await(timeout, unit); - - if(earlyWakeUp) { - // 超时时间内countDownLatch减到0,提前唤醒,说明已有结果 - return rpcResponse; - } else { - // 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果 - isTimeout = true; - return null; - } - } - - public void wake() { - countDownLatch.countDown(); - } - - public RpcResponse getRpcResponse() { - return rpcResponse; - } - - public void setRpcResponse(RpcResponse rpcResponse) { - this.rpcResponse = rpcResponse; - } - - public boolean isTimeout() { - return isTimeout; - } - -} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java index 1ef059718..50bb4a328 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java @@ -6,6 +6,8 @@ import com.jsowell.common.enums.ykc.PileChannelEntity; import com.jsowell.common.util.BytesUtil; import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.YKCUtils; +import com.jsowell.netty.rpc.RpcUtil; +import com.jsowell.pile.rpc.SyncPromise; import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; import io.netty.buffer.ByteBuf; import io.netty.channel.*; @@ -28,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; @ChannelHandler.Sharable @Slf4j @Component -public class NettyServerHandler extends SimpleChannelInboundHandler { +public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Autowired private YKCBusinessService ykcService; @@ -73,19 +75,29 @@ public class NettyServerHandler extends SimpleChannelInboundHandler { // 获取帧类型 byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1); String frameType = YKCUtils.frameType2Str(frameTypeBytes); + + // 判断该帧类型是否为某请求帧的应答帧 + String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType); + if (StringUtils.isNotBlank(requestFrameType)) { + // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + String msgId = ctx.channel().id().toString() + "_" + requestFrameType; + SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); + if(syncPromise != null) { + // 设置响应结果 + syncPromise.setRpcResult(msg); + // 唤醒外部线程 + syncPromise.wake(); + } + } + // 获取序列号域 int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2)); + // 获取channel Channel channel = ctx.channel(); - // new - // String hexString = DatatypeConverter.printHexBinary(msg); - // 心跳包0x03日志太多,造成日志文件过大,改为不打印 if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { - // log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}", - // channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, - // BytesUtil.binary(msg, 16), hexString); log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, BytesUtil.binary(msg, 16)); @@ -109,10 +121,10 @@ public class NettyServerHandler extends SimpleChannelInboundHandler { } } - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - log.info("channelRead0=== channelId:" + ctx.channel().id() + ", msg:" + msg); - } + // @Override + // protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + // log.info("channelRead0=== channelId:" + ctx.channel().id() + ", msg:" + msg); + // } /** * 有客户端终止连接服务器会触发此函数 diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/rpc/SyncPromise.java b/jsowell-pile/src/main/java/com/jsowell/pile/rpc/SyncPromise.java new file mode 100644 index 000000000..36f41af7f --- /dev/null +++ b/jsowell-pile/src/main/java/com/jsowell/pile/rpc/SyncPromise.java @@ -0,0 +1,72 @@ +package com.jsowell.pile.rpc; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class SyncPromise { + + // 用于接收结果 + // private RpcResponse rpcResponse; + + private byte[] rpcResult; + + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + // 用于判断是否超时 + private boolean isTimeout = false; + + public boolean isTimeout() { + return isTimeout; + } + + public byte[] getRpcResult() { + return rpcResult; + } + + public void setRpcResult(byte[] rpcResult) { + this.rpcResult = rpcResult; + } + + /** + * 同步等待返回结果 + */ + // public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException { + // // 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断 + // boolean earlyWakeUp = countDownLatch.await(timeout, unit); + // + // if(earlyWakeUp) { + // // 超时时间内countDownLatch减到0,提前唤醒,说明已有结果 + // return rpcResponse; + // } else { + // // 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果 + // isTimeout = true; + // return null; + // } + // } + + public byte[] get2(long timeout, TimeUnit unit) throws InterruptedException { + // 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断 + boolean earlyWakeUp = countDownLatch.await(timeout, unit); + + if(earlyWakeUp) { + // 超时时间内countDownLatch减到0,提前唤醒,说明已有结果 + return rpcResult; + } else { + // 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果 + isTimeout = true; + return null; + } + } + + public void wake() { + countDownLatch.countDown(); + } + + // public RpcResponse getRpcResponse() { + // return rpcResponse; + // } + // public void setRpcResponse(RpcResponse rpcResponse) { + // this.rpcResponse = rpcResponse; + // } + +} \ No newline at end of file diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/PileRemoteService.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/PileRemoteService.java index 60cd18d25..e54ad3e59 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/PileRemoteService.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/PileRemoteService.java @@ -151,6 +151,16 @@ public class PileRemoteService { ykcPushCommandService.pushProofreadTimeCommand(command); } + + public void proofreadTimeTest(String pileSn) { + ProofreadTimeCommand command = ProofreadTimeCommand.builder().pileSn(pileSn).build(); + try { + ykcPushCommandService.pushProofreadTimeCommandTest(command); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * 下发充电桩计费模型 */ @@ -291,4 +301,5 @@ public class PileRemoteService { public void reservationCharging(ReservationChargingCommand command) { ykcPushCommandService.pushReservationChargingCommand(command); } + } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/YKCPushCommandService.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/YKCPushCommandService.java index c727f047c..eba71fbfd 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/YKCPushCommandService.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/YKCPushCommandService.java @@ -83,4 +83,6 @@ public interface YKCPushCommandService { * @param command */ void pushReservationChargingCommand(ReservationChargingCommand command); + + void pushProofreadTimeCommandTest(ProofreadTimeCommand command) throws Exception; } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java index 01100616c..e5da4c74c 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java @@ -10,6 +10,7 @@ import com.jsowell.common.exception.BusinessException; import com.jsowell.common.util.*; import com.jsowell.common.util.Cp56Time2a.Cp56Time2aUtil; import com.jsowell.pile.domain.ykcCommond.*; +import com.jsowell.pile.rpc.SyncPromise; import com.jsowell.pile.service.*; import com.jsowell.pile.vo.web.BillingTemplateVO; import com.jsowell.pile.vo.web.PileModelInfoVO; @@ -28,7 +29,11 @@ import java.math.BigDecimal; import java.time.LocalTime; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * 向充电桩发送指令service @@ -36,6 +41,8 @@ import java.util.Objects; @Slf4j @Service public class YKCPushCommandServiceImpl implements YKCPushCommandService { + private final static Map syncPromiseMap = new ConcurrentHashMap<>(); + @Autowired private PileBillingTemplateService pileBillingTemplateService; @@ -131,6 +138,96 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { return true; } + public byte[] pushTest(byte[] msg, String pileSn, Enum frameTypeCode) throws Exception { + return this.pushTest(msg, pileSn, frameTypeCode, 5, TimeUnit.SECONDS); + } + + public byte[] pushTest(byte[] msg, String pileSn, Enum frameTypeCode, long timeout, TimeUnit unit) throws Exception { + // 通过桩编号获取channel + ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn); + String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); // 帧类型名称 + if (Objects.isNull(ctx)) { + log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn); + throw new NullPointerException("channel"); + } + if(msg == null) { + throw new NullPointerException("msg"); + } + if(timeout <= 0) { + throw new IllegalArgumentException("timeout must greater than 0"); + } + + // 创造一个容器,用于存放当前线程与rpcClient中的线程交互 + SyncPromise syncPromise = new SyncPromise(); + + // 消息id = channelId + 帧类型(例如: "0x34") + String msgId = ctx.channel().id().toString() + "_" + YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes()); + + syncPromiseMap.put(msgId, syncPromise); + + // 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果 + /* + 拼接报文 + */ + // 起始标志 + byte[] head = new byte[]{0x68}; + + // 序列号域 + byte[] serialNumber = new byte[]{0x00, 0x00}; + + // 加密标志 + byte[] encryptFlag = new byte[]{0x00}; + + // 帧类型标志 + byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()}; + + // 序列号域+加密标志+帧类型标志+消息体 + byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg); + + // 数据长度 + byte[] length = BytesUtil.intToBytes(temp.length, 1); + + // 帧校验域 + byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp)); + + // 返回报文 + byte[] writeMsg = Bytes.concat(head, length, temp, crc); + + // 返回完整的报文 string类型 + String wholeMsg = BytesUtil.binary(writeMsg, 16); + ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg); + ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf); + channelFuture.addListener((ChannelFutureListener) channelFutureListener -> { + // 检查操作的状态 + if (channelFutureListener.isSuccess()) { + log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", + pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); + } else { + // 如果发生错误,则访问描述原因的Throwable + Throwable cause = channelFutureListener.cause(); + log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", + pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); + log.error("push发送命令失败, pileSn:{}", pileSn, cause); + } + }); + + // 等待获取结果 + byte[] rpcResponse = syncPromise.get2(timeout, unit); + + if(rpcResponse == null) { + if(syncPromise.isTimeout()) { + throw new TimeoutException("等待响应结果超时"); + } else{ + throw new Exception("其他异常"); + } + } + + // 移除容器 + syncPromiseMap.remove(msgId); + + return rpcResponse; + } + /** * 发送启动充电指令 */ @@ -280,6 +377,24 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { log.info("[充电桩:{}对时, 时间:{}, CP56Time2a:{}]", pileSn, DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date), BytesUtil.binary(dateBytes, 16)); } + + @Override + public void pushProofreadTimeCommandTest(ProofreadTimeCommand command) throws Exception { + String pileSn = command.getPileSn(); + byte[] pileSnByteArr = BytesUtil.str2Bcd(pileSn); + + // 时间 + Date date = DateUtils.parseDate(DateUtils.getDateTime()); + byte[] dateBytes = Cp56Time2aUtil.date2Hbyte(date); + + // 拼装msg + byte[] msg = Bytes.concat(pileSnByteArr, dateBytes); + + byte[] bytes = this.pushTest(msg, pileSn, YKCFrameTypeCode.TIME_CHECK_SETTING_CODE); + log.info("[充电桩:{}对时, 时间:{}, CP56Time2a:{}], 响应:{}", + pileSn, DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date), BytesUtil.binary(dateBytes, 16), BytesUtil.binary(bytes, 16)); + } + /** * 向充电桩发送计费模板 * @@ -546,4 +661,5 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { log.info("【=====平台下发指令=====】: 预约充电指令, 交易流水号:{}, 桩编号:{}, 枪口号:{}, 操作:{}, 身份验证:{}, 开始时间:{}, 结束时间:{}, 启动金额:{}", transactionCode, pileSn, connectorCode, operation, verifyIdentity, DateUtils.formatDateTime(reservedStartTime), DateUtils.formatDateTime(reservedEndTime), amount); } + }