mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-04-21 11:35:12 +08:00
同步获取响应数据
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.jsowell.netty.rpc;
|
||||
|
||||
import com.jsowell.pile.rpc.SyncPromise;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
|
||||
@@ -1,12 +1,30 @@
|
||||
package com.jsowell.netty.rpc;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
|
||||
import com.jsowell.common.enums.ykc.PileChannelEntity;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.CRC16Util;
|
||||
import com.jsowell.common.util.YKCUtils;
|
||||
import com.jsowell.pile.rpc.SyncPromise;
|
||||
import com.jsowell.pile.service.PileMsgRecordService;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@Slf4j
|
||||
public class RpcUtil {
|
||||
|
||||
private final static Map<String, SyncPromise> syncPromiseMap = new ConcurrentHashMap<>();
|
||||
@@ -16,31 +34,128 @@ public class RpcUtil {
|
||||
static{
|
||||
channel = new RpcClient().connect("127.0.0.1", 8888);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private PileMsgRecordService pileMsgRecordService;
|
||||
|
||||
// 需要记录报文的数据帧类型
|
||||
private final static List<String> frameTypeList = Lists.newArrayList(
|
||||
YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_RESTART_CODE.getBytes()),
|
||||
YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE.getBytes()),
|
||||
YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE.getBytes()),
|
||||
YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getBytes())
|
||||
);
|
||||
|
||||
public static RpcResponse send(RpcRequest rpcRequest, long timeout, TimeUnit unit) throws Exception{
|
||||
|
||||
if(channel == null) {
|
||||
// if(channel == null) {
|
||||
// throw new NullPointerException("channel");
|
||||
// }
|
||||
//
|
||||
// if(rpcRequest == null) {
|
||||
// throw new NullPointerException("rpcRequest");
|
||||
// }
|
||||
//
|
||||
// if(timeout <= 0) {
|
||||
// throw new IllegalArgumentException("timeout must greater than 0");
|
||||
// }
|
||||
//
|
||||
// // 创造一个容器,用于存放当前线程与rpcClient中的线程交互
|
||||
// SyncPromise syncPromise = new SyncPromise();
|
||||
// syncPromiseMap.put(rpcRequest.getId(), syncPromise);
|
||||
//
|
||||
// // 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果
|
||||
// channel.writeAndFlush(rpcRequest);
|
||||
//
|
||||
// // 等待获取结果
|
||||
// RpcResponse rpcResponse = syncPromise.get2(timeout, unit);
|
||||
//
|
||||
// if(rpcResponse == null) {
|
||||
// if(syncPromise.isTimeout()) {
|
||||
// throw new TimeoutException("等待响应结果超时");
|
||||
// } else{
|
||||
// throw new Exception("其他异常");
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // 移除容器
|
||||
// syncPromiseMap.remove(rpcRequest.getId());
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public static byte[] send2(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode, long timeout, TimeUnit unit) throws Exception{
|
||||
// 通过桩编号获取channel
|
||||
ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
|
||||
String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); // 帧类型名称
|
||||
if (Objects.isNull(ctx)) {
|
||||
log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn);
|
||||
throw new NullPointerException("channel");
|
||||
}
|
||||
|
||||
if(rpcRequest == null) {
|
||||
throw new NullPointerException("rpcRequest");
|
||||
if(msg == null) {
|
||||
throw new NullPointerException("msg");
|
||||
}
|
||||
|
||||
if(timeout <= 0) {
|
||||
throw new IllegalArgumentException("timeout must greater than 0");
|
||||
}
|
||||
|
||||
|
||||
// 创造一个容器,用于存放当前线程与rpcClient中的线程交互
|
||||
SyncPromise syncPromise = new SyncPromise();
|
||||
syncPromiseMap.put(rpcRequest.getId(), syncPromise);
|
||||
|
||||
|
||||
// 消息id = channelId + 帧类型(例如: "0x34")
|
||||
String msgId = ctx.channel().id().toString() + "_" + YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes());
|
||||
|
||||
syncPromiseMap.put(msgId, syncPromise);
|
||||
|
||||
// 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果
|
||||
channel.writeAndFlush(rpcRequest);
|
||||
|
||||
/*
|
||||
拼接报文
|
||||
*/
|
||||
// 起始标志
|
||||
byte[] head = new byte[]{0x68};
|
||||
|
||||
// 序列号域
|
||||
byte[] serialNumber = new byte[]{0x00, 0x00};
|
||||
|
||||
// 加密标志
|
||||
byte[] encryptFlag = new byte[]{0x00};
|
||||
|
||||
// 帧类型标志
|
||||
byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()};
|
||||
|
||||
// 序列号域+加密标志+帧类型标志+消息体
|
||||
byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg);
|
||||
|
||||
// 数据长度
|
||||
byte[] length = BytesUtil.intToBytes(temp.length, 1);
|
||||
|
||||
// 帧校验域
|
||||
byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp));
|
||||
|
||||
// 返回报文
|
||||
byte[] writeMsg = Bytes.concat(head, length, temp, crc);
|
||||
|
||||
// 返回完整的报文 string类型
|
||||
String wholeMsg = BytesUtil.binary(writeMsg, 16);
|
||||
ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg);
|
||||
ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf);
|
||||
channelFuture.addListener((ChannelFutureListener) channelFutureListener -> {
|
||||
// 检查操作的状态
|
||||
if (channelFutureListener.isSuccess()) {
|
||||
log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
|
||||
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
|
||||
} else {
|
||||
// 如果发生错误,则访问描述原因的Throwable
|
||||
Throwable cause = channelFutureListener.cause();
|
||||
log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
|
||||
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
|
||||
log.error("push发送命令失败, pileSn:{}", pileSn, cause);
|
||||
}
|
||||
});
|
||||
|
||||
// 等待获取结果
|
||||
RpcResponse rpcResponse = syncPromise.get(timeout, unit);
|
||||
|
||||
byte[] rpcResponse = syncPromise.get2(timeout, unit);
|
||||
|
||||
if(rpcResponse == null) {
|
||||
if(syncPromise.isTimeout()) {
|
||||
throw new TimeoutException("等待响应结果超时");
|
||||
@@ -48,12 +163,73 @@ public class RpcUtil {
|
||||
throw new Exception("其他异常");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 移除容器
|
||||
syncPromiseMap.remove(rpcRequest.getId());
|
||||
|
||||
syncPromiseMap.remove(msgId);
|
||||
|
||||
return rpcResponse;
|
||||
}
|
||||
|
||||
public boolean push(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode) {
|
||||
// 通过桩编号获取channel
|
||||
ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
|
||||
String value = ((YKCFrameTypeCode) frameTypeCode).getValue();
|
||||
if (Objects.isNull(ctx)) {
|
||||
log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn);
|
||||
return false;
|
||||
}
|
||||
/*
|
||||
拼接报文
|
||||
*/
|
||||
// 起始标志
|
||||
byte[] head = new byte[]{0x68};
|
||||
|
||||
// 序列号域
|
||||
byte[] serialNumber = new byte[]{0x00, 0x00};
|
||||
|
||||
// 加密标志
|
||||
byte[] encryptFlag = new byte[]{0x00};
|
||||
|
||||
// 帧类型标志
|
||||
byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()};
|
||||
|
||||
// 序列号域+加密标志+帧类型标志+消息体
|
||||
byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg);
|
||||
|
||||
// 数据长度
|
||||
byte[] length = BytesUtil.intToBytes(temp.length, 1);
|
||||
|
||||
// 帧校验域
|
||||
byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp));
|
||||
|
||||
// 返回报文
|
||||
byte[] writeMsg = Bytes.concat(head, length, temp, crc);
|
||||
|
||||
// 返回完整的报文 string类型
|
||||
String wholeMsg = BytesUtil.binary(writeMsg, 16);
|
||||
ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg);
|
||||
ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf);
|
||||
channelFuture.addListener((ChannelFutureListener) channelFutureListener -> {
|
||||
// 检查操作的状态
|
||||
if (channelFutureListener.isSuccess()) {
|
||||
log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
|
||||
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
|
||||
} else {
|
||||
// 如果发生错误,则访问描述原因的Throwable
|
||||
Throwable cause = channelFutureListener.cause();
|
||||
log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
|
||||
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
|
||||
log.error("push发送命令失败, pileSn:{}", pileSn, cause);
|
||||
}
|
||||
});
|
||||
|
||||
// 保存报文
|
||||
String frameTypeStr = YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes());
|
||||
if (frameTypeList.contains(frameTypeStr)) {
|
||||
pileMsgRecordService.save(pileSn, null, frameTypeStr, null, wholeMsg);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static Map<String, SyncPromise> getSyncPromiseMap(){
|
||||
return syncPromiseMap;
|
||||
|
||||
@@ -1,49 +0,0 @@
|
||||
package com.jsowell.netty.rpc;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SyncPromise {
|
||||
|
||||
// 用于接收结果
|
||||
private RpcResponse rpcResponse;
|
||||
|
||||
private final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
// 用于判断是否超时
|
||||
private boolean isTimeout = false;
|
||||
|
||||
/**
|
||||
* 同步等待返回结果
|
||||
*/
|
||||
public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
// 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断
|
||||
boolean earlyWakeUp = countDownLatch.await(timeout, unit);
|
||||
|
||||
if(earlyWakeUp) {
|
||||
// 超时时间内countDownLatch减到0,提前唤醒,说明已有结果
|
||||
return rpcResponse;
|
||||
} else {
|
||||
// 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果
|
||||
isTimeout = true;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void wake() {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
|
||||
public RpcResponse getRpcResponse() {
|
||||
return rpcResponse;
|
||||
}
|
||||
|
||||
public void setRpcResponse(RpcResponse rpcResponse) {
|
||||
this.rpcResponse = rpcResponse;
|
||||
}
|
||||
|
||||
public boolean isTimeout() {
|
||||
return isTimeout;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -6,6 +6,8 @@ import com.jsowell.common.enums.ykc.PileChannelEntity;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.StringUtils;
|
||||
import com.jsowell.common.util.YKCUtils;
|
||||
import com.jsowell.netty.rpc.RpcUtil;
|
||||
import com.jsowell.pile.rpc.SyncPromise;
|
||||
import com.jsowell.netty.service.yunkuaichong.YKCBusinessService;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.*;
|
||||
@@ -28,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
@ChannelHandler.Sharable
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NettyServerHandler extends SimpleChannelInboundHandler {
|
||||
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
@Autowired
|
||||
private YKCBusinessService ykcService;
|
||||
@@ -73,19 +75,29 @@ public class NettyServerHandler extends SimpleChannelInboundHandler {
|
||||
// 获取帧类型
|
||||
byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1);
|
||||
String frameType = YKCUtils.frameType2Str(frameTypeBytes);
|
||||
|
||||
// 判断该帧类型是否为某请求帧的应答帧
|
||||
String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType);
|
||||
if (StringUtils.isNotBlank(requestFrameType)) {
|
||||
// 根据请求id,在集合中找到与外部线程通信的SyncPromise对象
|
||||
String msgId = ctx.channel().id().toString() + "_" + requestFrameType;
|
||||
SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId);
|
||||
if(syncPromise != null) {
|
||||
// 设置响应结果
|
||||
syncPromise.setRpcResult(msg);
|
||||
// 唤醒外部线程
|
||||
syncPromise.wake();
|
||||
}
|
||||
}
|
||||
|
||||
// 获取序列号域
|
||||
int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2));
|
||||
|
||||
// 获取channel
|
||||
Channel channel = ctx.channel();
|
||||
|
||||
// new
|
||||
// String hexString = DatatypeConverter.printHexBinary(msg);
|
||||
|
||||
// 心跳包0x03日志太多,造成日志文件过大,改为不打印
|
||||
if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) {
|
||||
// log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}",
|
||||
// channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
|
||||
// BytesUtil.binary(msg, 16), hexString);
|
||||
log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}",
|
||||
channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
|
||||
BytesUtil.binary(msg, 16));
|
||||
@@ -109,10 +121,10 @@ public class NettyServerHandler extends SimpleChannelInboundHandler {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
log.info("channelRead0=== channelId:" + ctx.channel().id() + ", msg:" + msg);
|
||||
}
|
||||
// @Override
|
||||
// protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
// log.info("channelRead0=== channelId:" + ctx.channel().id() + ", msg:" + msg);
|
||||
// }
|
||||
|
||||
/**
|
||||
* 有客户端终止连接服务器会触发此函数
|
||||
|
||||
Reference in New Issue
Block a user