!6 优化云快充ByteBuf

* 优化云快充ByteBuf
This commit is contained in:
三丙
2025-03-24 02:58:11 +00:00
parent f8baacdc38
commit d326a41963
16 changed files with 146 additions and 119 deletions

View File

@@ -15,6 +15,8 @@ import sanbing.jcpp.infrastructure.cache.CacheValueWrapper;
import sanbing.jcpp.infrastructure.cache.TransactionalCache;
import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.LoginRequest;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import sanbing.jcpp.protocol.adapter.DownlinkController;
import java.util.UUID;
@@ -55,22 +57,50 @@ public abstract class DownlinkCallService {
if (downlinkMessageBuilder.getSessionIdLSB() == 0) {
downlinkMessageBuilder.setSessionIdLSB(protocolSessionId.getLeastSignificantBits());
}
if(downlinkMessageBuilder.getProtocolName() == null){
if (downlinkMessageBuilder.getProtocolName() == null) {
downlinkMessageBuilder.setProtocolName(pileSession.getProtocolName());
}
String nodeId = pileSession.getNodeId();
String nodeIp = pileSession.getNodeIp();
int nodeRestPort = pileSession.getNodeRestPort();
int nodeGrpcPort = pileSession.getNodeGrpcPort();
sendDownlinkMessage(downlinkMessageBuilder, nodeId, nodeIp, nodeRestPort, nodeGrpcPort);
}
public void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, UplinkQueueMessage uplinkQueueMessage, LoginRequest loginRequest) {
if (downlinkMessageBuilder.getSessionIdMSB() == 0) {
downlinkMessageBuilder.setSessionIdMSB(uplinkQueueMessage.getSessionIdMSB());
}
if (downlinkMessageBuilder.getSessionIdLSB() == 0) {
downlinkMessageBuilder.setSessionIdLSB(uplinkQueueMessage.getSessionIdLSB());
}
if (downlinkMessageBuilder.getProtocolName() == null) {
downlinkMessageBuilder.setProtocolName(uplinkQueueMessage.getProtocolName());
}
String nodeId = loginRequest.getNodeId();
String nodeIp = loginRequest.getNodeHostAddress();
int nodeRestPort = loginRequest.getNodeRestPort();
int nodeGrpcPort = loginRequest.getNodeGrpcPort();
sendDownlinkMessage(downlinkMessageBuilder, nodeId, nodeIp, nodeRestPort, nodeGrpcPort);
}
private void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String nodeId, String nodeIp, int nodeRestPort, int nodeGrpcPort) {
if (serviceInfoProvider.isMonolith() &&
("caffeine".equalsIgnoreCase(cacheType) || serviceInfoProvider.getServiceId().equalsIgnoreCase(pileSession.getNodeId()))) {
("caffeine".equalsIgnoreCase(cacheType) || serviceInfoProvider.getServiceId().equalsIgnoreCase(nodeId))) {
downlinkController.onDownlink(downlinkMessageBuilder.build())
.setResultHandler(result -> log.debug("下行消息发送完成"));
} else {
_sendDownlinkMessage(downlinkMessageBuilder.build(), pileSession);
_sendDownlinkMessage(downlinkMessageBuilder.build(), nodeIp, nodeRestPort, nodeGrpcPort);
}
}
protected abstract void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession);
protected abstract void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, String nodeIp, int nodeRestPort, int nodeGrpcPort);
}

View File

@@ -64,27 +64,36 @@ public class DefaultPileProtocolService implements PileProtocolService {
DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, loginRequest.getPileCode());
downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.LOGIN_ACK.name());
if (pile != null) {
// 保存到缓存
cacheSession(uplinkQueueMessage, pile,
PileSession pileSession = createSession(uplinkQueueMessage, pile,
loginRequest.getRemoteAddress(),
loginRequest.getNodeId(),
loginRequest.getNodeHostAddress(),
loginRequest.getNodeRestPort(),
loginRequest.getNodeGrpcPort());
// 保存到缓存
pileSessionCache.put(new PileSessionCacheKey(pile.getPileCode()), pileSession);
downlinkMessageBuilder.setLoginResponse(LoginResponse.newBuilder()
.setSuccess(true)
.setPileCode(loginRequest.getPileCode())
.build());
downlinkCallService.sendDownlinkMessage(downlinkMessageBuilder, pileCode);
} else {
downlinkMessageBuilder.setLoginResponse(LoginResponse.newBuilder()
.setSuccess(false)
.setPileCode(loginRequest.getPileCode())
.build());
downlinkCallService.sendDownlinkMessage(downlinkMessageBuilder, uplinkQueueMessage, loginRequest);
}
downlinkCallService.sendDownlinkMessage(downlinkMessageBuilder, pileCode);
callback.onSuccess();
}
@@ -99,7 +108,7 @@ public class DefaultPileProtocolService implements PileProtocolService {
if (pile != null) {
// 重新保存到缓存
cacheSession(uplinkQueueMessage, pile,
createSession(uplinkQueueMessage, pile,
heartBeatRequest.getRemoteAddress(),
heartBeatRequest.getNodeId(),
heartBeatRequest.getNodeHostAddress(),
@@ -110,13 +119,13 @@ public class DefaultPileProtocolService implements PileProtocolService {
callback.onSuccess();
}
private void cacheSession(UplinkQueueMessage uplinkQueueMessage,
Pile pile,
String remoteAddress,
String nodeId,
String nodeIp,
int restPort,
int grpcPort) {
private PileSession createSession(UplinkQueueMessage uplinkQueueMessage,
Pile pile,
String remoteAddress,
String nodeId,
String nodeIp,
int restPort,
int grpcPort) {
PileSession pileSession = new PileSession(pile.getId(), pile.getPileCode(), uplinkQueueMessage.getProtocolName());
pileSession.setProtocolSessionId(new UUID(uplinkQueueMessage.getSessionIdMSB(), uplinkQueueMessage.getSessionIdLSB()));
pileSession.setRemoteAddress(remoteAddress);
@@ -124,7 +133,8 @@ public class DefaultPileProtocolService implements PileProtocolService {
pileSession.setNodeIp(nodeIp);
pileSession.setNodeRestPort(restPort);
pileSession.setNodeGrpcPort(grpcPort);
pileSessionCache.put(new PileSessionCacheKey(pile.getPileCode()), pileSession);
return pileSession;
}
@Override

View File

@@ -11,7 +11,6 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import sanbing.jcpp.app.data.PileSession;
import sanbing.jcpp.app.service.DownlinkCallService;
import sanbing.jcpp.app.service.grpc.DownlinkGrpcClient;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
@@ -31,7 +30,7 @@ public class GrpcDownlinkCallService extends DownlinkCallService {
DownlinkGrpcClient downlinkGrpcClient;
@Override
protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession) {
protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, String nodeIp, int nodeRestPort, int nodeGrpcPort) {
try {
RequestMsg requestMsg = RequestMsg.newBuilder()
@@ -40,7 +39,7 @@ public class GrpcDownlinkCallService extends DownlinkCallService {
.setDownlinkRequestMessage(downlinkMessage)
.build();
downlinkGrpcClient.sendDownlinkRequest(HostAndPort.fromParts(pileSession.getNodeIp(), pileSession.getNodeGrpcPort()),
downlinkGrpcClient.sendDownlinkRequest(HostAndPort.fromParts(nodeIp, nodeGrpcPort),
requestMsg);
} catch (Exception e) {

View File

@@ -16,7 +16,6 @@ import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import sanbing.jcpp.app.data.PileSession;
import sanbing.jcpp.app.service.DownlinkCallService;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
@@ -35,10 +34,10 @@ public class RestDownlinkCallService extends DownlinkCallService {
RestTemplate downlinkRestTemplate;
@Override
protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession) {
protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, String nodeIp, int nodeRestPort, int nodeGrpcPort) {
try {
invokeDownlinkRestApi(downlinkMessage, pileSession.getNodeIp(), pileSession.getNodeRestPort());
invokeDownlinkRestApi(downlinkMessage, nodeIp, nodeGrpcPort);
} catch (RestClientException e) {
log.error("下行消息发送异常", e);

View File

@@ -23,6 +23,7 @@ import sanbing.jcpp.protocol.yunkuaichong.annotation.YunKuaiChongCmd;
import sanbing.jcpp.protocol.yunkuaichong.enums.YunKuaiChongDownlinkCmdEnum;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -70,93 +71,81 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess
@Override
public void uplinkHandle(ListenerToHandlerMsg listenerToHandlerMsg) {
UUID msgId = listenerToHandlerMsg.id();
byte[] msg = listenerToHandlerMsg.msg();
TcpSession session = (TcpSession) listenerToHandlerMsg.session();
final UUID msgId = listenerToHandlerMsg.id();
final byte[] msg = listenerToHandlerMsg.msg();
final TcpSession session = (TcpSession) listenerToHandlerMsg.session();
ByteBuf in = Unpooled.copiedBuffer(msg);
// ================== 前置快速失败检查 ==================
if (msg.length < 8 || msg[0] != 0x68) {
return;
}
// 判断是否可以读取报头8个字节
if (in.readableBytes() < 8) {
ByteBuf in = Unpooled.wrappedBuffer(msg);
try {
// ================== 协议头解析 ==================
final int dataLength = in.getUnsignedByte(1);
final int bodyLength = dataLength - 4;
final int checksumPos = 6 + bodyLength;
// ================== 组合边界检查 ==================
if (dataLength < 4 || in.readableBytes() < checksumPos + 2) {
return;
}
// ================== 字段快速解析 ==================
final int seqNo = in.getUnsignedShort(2);
final int encryptFlag = in.getUnsignedByte(4);
final int frameType = in.getUnsignedByte(5);
// ================== 校验和双模式处理 ==================
final int checkSumLE = in.getUnsignedShortLE(checksumPos);
final int checkSumBE = in.getUnsignedShort(checksumPos);
// ================== 校验数据智能拷贝 ==================
final byte[] checkData = Arrays.copyOfRange(msg, 2, 2 + dataLength);
// ================== 短路校验流程 ==================
JCPPPair<Boolean, Integer> checkResult = checkCrcSum(checkData, checkSumLE);
if (!checkResult.getFirst()) {
checkResult = checkCrcSum(checkData, checkSumBE);
if (log.isDebugEnabled()) { // 日志惰性计算
log.debug("{} 云快充校验域一次校验失败 CMD:{} 校验和0x{} 期望校验和:0x{}",
session, frameType, Integer.toHexString(checkSumBE), Integer.toHexString(checkResult.getSecond()));
}
}
// ================== 最终校验失败处理 ==================
if (!checkResult.getFirst()) {
log.info("{} 云快充校验域二次校验失败 CMD:{} 校验和0x{} 期望校验和:0x{}",
session, frameType, Integer.toHexString(checkSumBE), Integer.toHexString(checkResult.getSecond()));
return;
}
// ================== 消息对象智能构建 ==================
ByteBuf slicedBuf = in.slice(6, bodyLength);
if (slicedBuf.readableBytes() != bodyLength) {
log.error("协议体长度异常: expected={}, actual={}",
bodyLength, slicedBuf.readableBytes());
return;
}
byte[] msgBody = new byte[bodyLength];
slicedBuf.readBytes(msgBody);
exeCmd(new YunKuaiChongUplinkMessage(msgId)
.setHead(0x68)
.setDataLength(dataLength)
.setSequenceNumber(seqNo)
.setEncryptionFlag(encryptFlag)
.setCmd(frameType)
.setMsgBody(msgBody) // 使用正确长度的数组
.setCheckSum(checkResult.getSecond())
.setRawFrame(msg),
session);
} finally {
in.release();
return;
}
// 起始标识, 固定为0x68
int startFlag = in.readUnsignedByte();
if (startFlag != 0x68) {
in.release();
return;
}
// 数据长度 = 序列号域+加密标志+帧类型标志+消息体
int dataLength = in.readUnsignedByte();
// 报文的流水号
int seqNo = in.readUnsignedShortLE();
// 加密标志
int encryptFlag = in.readUnsignedByte();
// 帧类型标志
int frameType = in.readUnsignedByte();
// 判断是否可以读取消息体N-4个字节
int msgBodyLength = dataLength - 4;
if (in.readableBytes() < msgBodyLength) {
in.release();
return;
}
// 消息体
byte[] msgBody = new byte[msgBodyLength];
in.readBytes(msgBody);
// 判断是否可以读取校验和, 2个字节
if (in.readableBytes() < 2) {
in.release();
return;
}
// 读取两字节校验域
byte[] byCheckSum = new byte[2];
in.readBytes(byCheckSum);
ByteBuf csTemp = Unpooled.buffer();
csTemp.writeBytes(byCheckSum);
// 校验校验和,先用小端获取做短路校验
int checkSum = csTemp.readUnsignedShortLE();
byte[] checkData = new byte[dataLength];
System.arraycopy(msg, 2, checkData, 0, dataLength);
JCPPPair<Boolean, Integer> checkResult = checkCrcSum(checkData, checkSum);
if (Boolean.FALSE.equals(checkResult.getFirst())) {
csTemp.writeBytes(byCheckSum);
checkSum = csTemp.readUnsignedShort();
checkResult = checkCrcSum(checkData, checkSum);
log.debug("云快充检验和 第二次检查: checkResult:{}, checkSum:{}", checkResult, checkSum);
}
if (Boolean.FALSE.equals(checkResult.getFirst())) {
log.info("云快充检验和不一致两次不通过 不处理! CMD{},校验域:{},正确校验和:{}", frameType, checkSum, checkResult.getSecond());
return;
}
YunKuaiChongUplinkMessage message = new YunKuaiChongUplinkMessage(msgId);
message.setHead(startFlag);
message.setDataLength(dataLength);
message.setSequenceNumber(seqNo);
message.setEncryptionFlag(encryptFlag);
message.setCmd(frameType);
message.setMsgBody(msgBody);
message.setCheckSum(checkSum);
message.setRawFrame(msg);
exeCmd(message, session);
}
@Override
@@ -188,7 +177,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess
if (uplinkCmdExe == null) {
log.info("[{}] 云快充协议接收到未知的上行指令 {}", session, message.getCmd());
log.info("{} 云快充协议接收到未知的上行指令 {}", session, message.getCmd());
return;
}
@@ -201,7 +190,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess
if (downlinkCmdExe == null) {
log.info("[{}] 云快充协议接收到未知的下行指令 {}", session, message.getCmd());
log.info("{} 云快充协议接收到未知的下行指令 {}", session, message.getCmd());
return;
}

View File

@@ -32,7 +32,7 @@ public class YunKuaiChongV150BmsHandshakeULCmd extends YunKuaiChongUplinkCmdExe
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.debug("{} 云快充1.5.0充电握手", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();

View File

@@ -33,7 +33,7 @@ public class YunKuaiChongV150HeartbeatULCmd extends YunKuaiChongUplinkCmdExe {
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.debug("{} 云快充1.5.0充电桩心跳包", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();

View File

@@ -32,7 +32,7 @@ public class YunKuaiChongV150LoginULCmd extends YunKuaiChongUplinkCmdExe {
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.debug("{} 云快充1.5.0登录认证请求", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();

View File

@@ -31,7 +31,7 @@ public class YunKuaiChongV150QueryPricingModelULCmd extends YunKuaiChongUplinkCm
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.info("{} 云快充1.5.0充电桩计费模型请求", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();

View File

@@ -57,7 +57,7 @@ public class YunKuaiChongV150RealTimeDataULCmd extends YunKuaiChongUplinkCmdExe
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.info("{} 云快充1.5.0上传实时监测数据", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
// 从Tracer总获取当前时间
long ts = TracerContextUtil.getCurrentTracer().getTracerTs();

View File

@@ -33,7 +33,7 @@ public class YunKuaiChongV150RemoteStartResultULCmd extends YunKuaiChongUplinkCm
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.info("{} 云快充1.5.0远程启动充电命令回复", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
// 从Tracer总获取当前时间
long ts = TracerContextUtil.getCurrentTracer().getTracerTs();

View File

@@ -32,7 +32,7 @@ public class YunKuaiChongV150RemoteStopResultULCmd extends YunKuaiChongUplinkCmd
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.info("{} 云快充1.5.0远程停机命令回复", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
// 从Tracer总获取当前时间
long ts = TracerContextUtil.getCurrentTracer().getTracerTs();

View File

@@ -31,7 +31,7 @@ public class YunKuaiChongV150SetPricingModelAckULCmd extends YunKuaiChongUplinkC
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.info("{} 云快充1.5.0计费模型应答", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
// 1.桩编号
byte[] pileCodeBytes = new byte[7];

View File

@@ -37,7 +37,7 @@ public class YunKuaiChongV150TransactionRecordULCmd extends YunKuaiChongUplinkCm
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.info("{} 云快充1.5.0交易记录", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();

View File

@@ -34,7 +34,7 @@ public class YunKuaiChongV150VerifyPricingModelULCmd extends YunKuaiChongUplinkC
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.info("{} 云快充1.5.0计费模型验证请求", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();

View File

@@ -33,7 +33,7 @@ public class YunKuaiChongV160RemoteParallelStartResultULCmd extends YunKuaiChong
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
log.info("{} 云快充1.6.远程并充启机命令回复", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ByteBuf byteBuf = Unpooled.wrappedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
// 从Tracer总获取当前时间
long ts = TracerContextUtil.getCurrentTracer().getTracerTs();