同步获取响应数据

This commit is contained in:
Guoqs
2024-08-01 16:24:52 +08:00
parent 201137bb9f
commit 901310f061
9 changed files with 61 additions and 307 deletions

View File

@@ -252,14 +252,14 @@ public class UploadRealTimeMonitorHandler extends AbstractHandler {
if (StringUtils.equals("01", isChargerPluggedIn)) {
// 插枪状态
if (redisCache.setnx(plugRedisKey, pileConnectorCode, CacheConstants.cache_expire_time_30d)) {
log.info("时间:{}, 枪口号:{}, 首次插入充电枪", DateUtils.getDateTime(), pileConnectorCode);
// log.info("时间:{}, 枪口号:{}, 首次插入充电枪", DateUtils.getDateTime(), pileConnectorCode);
// 设置成功说明 第一次插枪
// pileBasicInfoService.firstPlugInCharger(pileConnectorCode);
}
} else {
// 未插枪状态
if (redisCache.hasKey(plugRedisKey) && redisCache.deleteObject(plugRedisKey)) {
log.info("时间:{}, 枪口号:{}, 首次拔出充电枪", DateUtils.getDateTime(), pileConnectorCode);
// log.info("时间:{}, 枪口号:{}, 首次拔出充电枪", DateUtils.getDateTime(), pileConnectorCode);
// redis有值并且删除成功说明首次拔枪
// pileBasicInfoService.firstUnplugCharger(pileConnectorCode);
}

View File

@@ -1,5 +1,6 @@
package com.jsowell.netty.rpc;
import com.jsowell.pile.rpc.RpcUtil;
import com.jsowell.pile.rpc.SyncPromise;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

View File

@@ -1,239 +0,0 @@
package com.jsowell.netty.rpc;
import com.google.common.collect.Lists;
import com.google.common.primitives.Bytes;
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.enums.ykc.PileChannelEntity;
import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.CRC16Util;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.pile.rpc.SyncPromise;
import com.jsowell.pile.service.PileMsgRecordService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
public class RpcUtil {
private final static Map<String, SyncPromise> syncPromiseMap = new ConcurrentHashMap<>();
private final static Channel channel;
static{
channel = new RpcClient().connect("127.0.0.1", 8888);
}
@Autowired
private PileMsgRecordService pileMsgRecordService;
// 需要记录报文的数据帧类型
private final static List<String> frameTypeList = Lists.newArrayList(
YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_RESTART_CODE.getBytes()),
YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE.getBytes()),
YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE.getBytes()),
YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getBytes())
);
public static RpcResponse send(RpcRequest rpcRequest, long timeout, TimeUnit unit) throws Exception{
// if(channel == null) {
// throw new NullPointerException("channel");
// }
//
// if(rpcRequest == null) {
// throw new NullPointerException("rpcRequest");
// }
//
// if(timeout <= 0) {
// throw new IllegalArgumentException("timeout must greater than 0");
// }
//
// // 创造一个容器用于存放当前线程与rpcClient中的线程交互
// SyncPromise syncPromise = new SyncPromise();
// syncPromiseMap.put(rpcRequest.getId(), syncPromise);
//
// // 发送消息此处如果发送玩消息并且在get之前返回了结果下一行的get将不会进入阻塞也可以顺利拿到结果
// channel.writeAndFlush(rpcRequest);
//
// // 等待获取结果
// RpcResponse rpcResponse = syncPromise.get2(timeout, unit);
//
// if(rpcResponse == null) {
// if(syncPromise.isTimeout()) {
// throw new TimeoutException("等待响应结果超时");
// } else{
// throw new Exception("其他异常");
// }
// }
//
// // 移除容器
// syncPromiseMap.remove(rpcRequest.getId());
return null;
}
public static byte[] send2(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode, long timeout, TimeUnit unit) throws Exception{
// 通过桩编号获取channel
ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); // 帧类型名称
if (Objects.isNull(ctx)) {
log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn);
throw new NullPointerException("channel");
}
if(msg == null) {
throw new NullPointerException("msg");
}
if(timeout <= 0) {
throw new IllegalArgumentException("timeout must greater than 0");
}
// 创造一个容器用于存放当前线程与rpcClient中的线程交互
SyncPromise syncPromise = new SyncPromise();
// 消息id = channelId + 帧类型(例如: "0x34")
String msgId = ctx.channel().id().toString() + "_" + YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes());
syncPromiseMap.put(msgId, syncPromise);
// 发送消息此处如果发送玩消息并且在get之前返回了结果下一行的get将不会进入阻塞也可以顺利拿到结果
/*
拼接报文
*/
// 起始标志
byte[] head = new byte[]{0x68};
// 序列号域
byte[] serialNumber = new byte[]{0x00, 0x00};
// 加密标志
byte[] encryptFlag = new byte[]{0x00};
// 帧类型标志
byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()};
// 序列号域+加密标志+帧类型标志+消息体
byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg);
// 数据长度
byte[] length = BytesUtil.intToBytes(temp.length, 1);
// 帧校验域
byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp));
// 返回报文
byte[] writeMsg = Bytes.concat(head, length, temp, crc);
// 返回完整的报文 string类型
String wholeMsg = BytesUtil.binary(writeMsg, 16);
ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg);
ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf);
channelFuture.addListener((ChannelFutureListener) channelFutureListener -> {
// 检查操作的状态
if (channelFutureListener.isSuccess()) {
log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
} else {
// 如果发生错误则访问描述原因的Throwable
Throwable cause = channelFutureListener.cause();
log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
log.error("push发送命令失败, pileSn:{}", pileSn, cause);
}
});
// 等待获取结果
byte[] rpcResponse = syncPromise.get2(timeout, unit);
if(rpcResponse == null) {
if(syncPromise.isTimeout()) {
throw new TimeoutException("等待响应结果超时");
} else{
throw new Exception("其他异常");
}
}
// 移除容器
syncPromiseMap.remove(msgId);
return rpcResponse;
}
public boolean push(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode) {
// 通过桩编号获取channel
ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
String value = ((YKCFrameTypeCode) frameTypeCode).getValue();
if (Objects.isNull(ctx)) {
log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn);
return false;
}
/*
拼接报文
*/
// 起始标志
byte[] head = new byte[]{0x68};
// 序列号域
byte[] serialNumber = new byte[]{0x00, 0x00};
// 加密标志
byte[] encryptFlag = new byte[]{0x00};
// 帧类型标志
byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()};
// 序列号域+加密标志+帧类型标志+消息体
byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg);
// 数据长度
byte[] length = BytesUtil.intToBytes(temp.length, 1);
// 帧校验域
byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp));
// 返回报文
byte[] writeMsg = Bytes.concat(head, length, temp, crc);
// 返回完整的报文 string类型
String wholeMsg = BytesUtil.binary(writeMsg, 16);
ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg);
ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf);
channelFuture.addListener((ChannelFutureListener) channelFutureListener -> {
// 检查操作的状态
if (channelFutureListener.isSuccess()) {
log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
} else {
// 如果发生错误则访问描述原因的Throwable
Throwable cause = channelFutureListener.cause();
log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
log.error("push发送命令失败, pileSn:{}", pileSn, cause);
}
});
// 保存报文
String frameTypeStr = YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes());
if (frameTypeList.contains(frameTypeStr)) {
pileMsgRecordService.save(pileSn, null, frameTypeStr, null, wholeMsg);
}
return true;
}
public static Map<String, SyncPromise> getSyncPromiseMap(){
return syncPromiseMap;
}
}

View File

@@ -1,5 +1,7 @@
package com.jsowell.netty.rpc;
import com.jsowell.pile.rpc.RpcUtil;
import java.util.concurrent.TimeUnit;
public class TestRpcClient {
@@ -13,9 +15,9 @@ public class TestRpcClient {
rpcRequest.setParam("参数1");
try {
System.out.println("thread1发送请求");
RpcResponse rpcResponse = RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS);
System.out.println("thread1处理结果" + rpcResponse);
// System.out.println("thread1发送请求");
// RpcResponse rpcResponse = RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS);
// System.out.println("thread1处理结果" + rpcResponse);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -29,9 +31,9 @@ public class TestRpcClient {
rpcRequest2.setParam("参数2");
try {
System.out.println("thread2发送请求");
RpcResponse rpcResponse = RpcUtil.send(rpcRequest2, 2, TimeUnit.SECONDS);
System.out.println("thread2处理结果" + rpcResponse);
// System.out.println("thread2发送请求");
// RpcResponse rpcResponse = RpcUtil.send(rpcRequest2, 2, TimeUnit.SECONDS);
// System.out.println("thread2处理结果" + rpcResponse);
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@@ -1,12 +1,13 @@
package com.jsowell.netty.server.yunkuaichong;
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Lists;
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.enums.ykc.PileChannelEntity;
import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.StringUtils;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.rpc.RpcUtil;
import com.jsowell.pile.rpc.RpcUtil;
import com.jsowell.pile.rpc.SyncPromise;
import com.jsowell.netty.service.yunkuaichong.YKCBusinessService;
import io.netty.buffer.ByteBuf;
@@ -78,14 +79,17 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 判断该帧类型是否为某请求帧的应答帧
String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType);
log.info("同步获取响应数据-判断该帧类型是否为某请求帧的应答帧, frameType:{}, requestFrameType:{}", frameType, requestFrameType);
if (StringUtils.isNotBlank(requestFrameType)) {
// 根据请求id在集合中找到与外部线程通信的SyncPromise对象
String msgId = ctx.channel().id().toString() + "_" + requestFrameType;
log.info("同步获取响应数据-收到消息, msgId:{}", msgId);
SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId);
if(syncPromise != null) {
// 设置响应结果
syncPromise.setRpcResult(msg);
// 唤醒外部线程
log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise));
syncPromise.wake();
}
}