This commit is contained in:
Guoqs
2024-11-01 15:19:47 +08:00
parent 1695b2c5c6
commit 16fc06e666
6 changed files with 783 additions and 0 deletions

View File

@@ -0,0 +1,402 @@
package com.jsowell.netty.service.rabbitmq;
import com.alibaba.fastjson2.JSON;
import com.google.common.primitives.Bytes;
import com.jsowell.common.constant.CacheConstants;
import com.jsowell.common.constant.Constants;
import com.jsowell.common.core.domain.ykc.LoginRequestData;
import com.jsowell.common.core.domain.ykc.RealTimeMonitorData;
import com.jsowell.common.core.domain.ykc.YKCDataProtocol;
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.core.domain.ykc.device2platform.Data0x01;
import com.jsowell.common.core.domain.ykc.device2platform.Data0x03;
import com.jsowell.common.core.domain.ykc.device2platform.Data0x13;
import com.jsowell.common.core.redis.RedisCache;
import com.jsowell.common.enums.ykc.OrderPayStatusEnum;
import com.jsowell.common.enums.ykc.OrderStatusEnum;
import com.jsowell.common.enums.ykc.PileChannelEntity;
import com.jsowell.common.enums.ykc.PileConnectorStatusEnum;
import com.jsowell.common.util.*;
import com.jsowell.common.util.bean.SerializationUtil;
import com.jsowell.common.util.spring.SpringUtils;
import com.jsowell.pile.domain.OrderBasicInfo;
import com.jsowell.pile.domain.PileBasicInfo;
import com.jsowell.pile.domain.ykcCommond.IssueQRCodeCommand;
import com.jsowell.pile.domain.ykcCommond.ProofreadTimeCommand;
import com.jsowell.pile.service.OrderBasicInfoService;
import com.jsowell.pile.service.PileBasicInfoService;
import com.jsowell.pile.service.PileMsgRecordService;
import com.jsowell.pile.service.YKCPushCommandService;
import com.jsowell.thirdparty.common.CommonService;
import com.rabbitmq.client.Channel;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class PileRabbitListener {
@Autowired
private PileBasicInfoService pileBasicInfoService;
@Autowired
private YKCPushCommandService ykcPushCommandService;
@Autowired
private PileMsgRecordService pileMsgRecordService;
@Autowired
private OrderBasicInfoService orderBasicInfoService;
@Autowired
private CommonService commonService;
@Autowired
private RedisCache redisCache;
// 引入线程池
private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor");
// ========================= 消费消息 ========================== //
// @RabbitListener(queues = "ykc.pileLogin-topic.device-group")
// public void testRabbitMQMessage(String msg) {
// CompletableFuture.runAsync(() -> {
// log.info("RabbitMQ处理请求start, threadName:{}, message:{}", Thread.currentThread().getName(), msg);
// try {
// // 模拟处理耗时
// Thread.sleep(500);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// log.info("RabbitMQ处理请求end, threadName:{}, message:{}", Thread.currentThread().getName(), msg);
// }, executor);
// }
/**
* 多线程消费登录请求消息
* @param msg
*/
// @RabbitListener(queues = RabbitConstants.QUEUE_PILE_LOGIN)
public void receiveLoginMessage(byte[] msg, Channel channel, Message message) {
CompletableFuture.runAsync(() -> {
loginLogic(msg);
try {
//由于配置设置了手动应答所以这里要进行一个手动应答。注意如果设置了自动应答这里又进行手动应答会出现double ack那么程序会报错。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, executor);
}
/**
* 登录请求逻辑
* @param message
*/
private void loginLogic(byte[] message) {
log.info("RabbitMQ处理登录请求, threadName:{}, message:{}", Thread.currentThread().getName(), message);
// 处理登录请求
YKCDataProtocol ykcDataProtocol = new YKCDataProtocol(message);
Data0x01 loginData = new Data0x01(message);
String pileSn = loginData.getPileSn();
String pileType = loginData.getPileType();
String connectorNum = loginData.getConnectorNum();
String communicationVersion = loginData.getCommunicationVersion();
String programVersion = loginData.getProgramVersion();
String internetConnection = loginData.getInternetConnection();
String iccid = loginData.getIccid();
String business = loginData.getBusiness();
ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
if (YKCUtils.verifyTheDuplicateRequest(ykcDataProtocol, ctx)) {
return;
}
LoginRequestData loginRequestData = LoginRequestData.builder()
.pileSn(pileSn)
.pileType(pileType)
.connectorNum(connectorNum)
.communicationVersion(communicationVersion)
.programVersion(programVersion)
.internetConnection(internetConnection)
.iccid(iccid)
.business(business)
.build();
// 结果(默认 0x01:登录失败)
byte[] flag = Constants.oneByteArray;
// 通过桩编码SN查询数据库如果有数据则登录成功否则登录失败
PileBasicInfo pileBasicInfo = null;
try {
pileBasicInfo = pileBasicInfoService.selectPileBasicInfoBySN(pileSn);
} catch (Exception e) {
log.error("selectPileBasicInfoBySN发生异常", e);
}
if (pileBasicInfo != null) {
flag = Constants.zeroByteArray;
// 异步修改充电桩状态
CompletableFuture.runAsync(() -> {
try {
// 更改桩和该桩下的枪口状态分别为 在线、空闲 公共方法修改状态
pileBasicInfoService.updateStatus(BytesUtil.bcd2Str(ykcDataProtocol.getFrameType()), pileSn, null, null, null);
} catch (Exception e) {
e.printStackTrace();
}
}, executor);
// 异步发送对时指令
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 对时
ProofreadTimeCommand command = ProofreadTimeCommand.builder().pileSn(pileSn).build();
ykcPushCommandService.pushProofreadTimeCommand(command);
}, executor);
// log.info("下面进行下发二维码 pileSn:{}, thread:{}", pileSn, Thread.currentThread().getName());
// 异步发送下发二维码指令
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 下发二维码
IssueQRCodeCommand issueQRCodeCommand = IssueQRCodeCommand.builder().pileSn(pileSn).build();
ykcPushCommandService.pushIssueQRCodeCommand(issueQRCodeCommand);
}, executor);
if (StringUtils.equals("00", internetConnection)) {
CompletableFuture.runAsync(() -> {
// 充电桩使用的sim卡把信息存库
try {
pileBasicInfoService.updatePileSimInfo(pileSn, iccid);
// pileBasicInfoService.updatePileSimInfoV2(pileSn, iccid);
} catch (Exception e) {
log.error("更新充电桩sim卡信息失败pileSn:{}, iccid:{}", pileSn, iccid, e);
}
}, executor);
}
}
// 异步保持登录报文
CompletableFuture.runAsync(() -> {
// 保存报文 没有登录认证通过还要不要保存报文?
try {
String jsonMsg = JSON.toJSONString(loginRequestData);
pileMsgRecordService.save(pileSn, pileSn, YKCUtils.frameType2Str(YKCFrameTypeCode.LOGIN_CODE.getBytes()), jsonMsg, ykcDataProtocol.getHEXString());
} catch (Exception e) {
log.error("保存报文失败pileSn:{}", pileSn, e);
}
}, executor);
// 消息体
byte[] messageBody = Bytes.concat(YKCUtils.getPileSnBytes(pileSn), flag);
byte[] result = YKCUtils.getResult(ykcDataProtocol, messageBody);
log.info("RabbitMQ登录Result:{}", BytesUtil.binary(result, 16));
ctx.writeAndFlush(Unpooled.copiedBuffer(result));
}
/**
* 多线程消费心跳消息
* @param message
*/
// @RabbitListener(queues = RabbitConstants.QUEUE_HEART_BEAT)
public void receiveHeartBeat(byte[] msg, Channel channel, Message message) throws IOException {
// CompletableFuture.runAsync(() -> {
// }, executor);
heartBeatLogic(msg);
//由于配置设置了手动应答所以这里要进行一个手动应答。注意如果设置了自动应答这里又进行手动应答会出现double ack那么程序会报错。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
/**
* 心跳逻辑
* @param message
*/
private void heartBeatLogic(byte[] message) {
Data0x03 heartbeatData = SerializationUtil.deserialize(Data0x03.class, message);
log.info("RabbitMQ处理心跳消息, threadName:{}, message:{}", Thread.currentThread().getName(), JSON.toJSONString(heartbeatData));
// 处理登录请求
String pileSn = heartbeatData.getPileSn();
String connectorCode = heartbeatData.getConnectorCode();
String connectorStatus = heartbeatData.getConnectorStatus();
// ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
ChannelHandlerContext ctx = ChannelManagerUtil.getChannel(heartbeatData.getChannelId());
// 保存时间
YKCUtils.saveLastTimeAndCheckChannel(heartbeatData.getPileSn(), ctx);
// 公共方法修改状态
try {
pileBasicInfoService.updateStatus("0x03", pileSn, connectorCode, connectorStatus, null);
} catch (Exception e) {
log.error("公共方法修改状态error", e);
}
// 心跳应答置0
byte[] flag = Constants.zeroByteArray;
// 消息体
byte[] messageBody = Bytes.concat(BytesUtil.hexStringToByteArray(pileSn), BytesUtil.hexStringToByteArray(connectorCode), flag);
byte[] result = YKCUtils.getResult(heartbeatData, messageBody);
log.info("RabbitMQ心跳应答, result:{}", BytesUtil.printHexBinary(result));
ctx.channel().writeAndFlush(ctx.channel().alloc().buffer().writeBytes(result));
}
public static void main(String[] args) {
System.out.println(DateUtils.getNowDate().before(DateUtils.parseDate("2025-07-06")));
}
/**
* 多线程消费实时监控数据消息
* @param msg
*/
// @RabbitListener(queues = RabbitConstants.QUEUE_REALTIME_DATA)
public void receiveRealtimeData(byte[] msg, Channel channel, Message message) {
CompletableFuture.runAsync(() -> {
realtimeDataLogic(msg);
try {
//由于配置设置了手动应答所以这里要进行一个手动应答。注意如果设置了自动应答这里又进行手动应答会出现double ack那么程序会报错。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, executor);
}
private RealTimeMonitorData transformRealTimeMonitorData(Data0x13 realTimeData) {
RealTimeMonitorData realTimeMonitorData = new RealTimeMonitorData();
if (realTimeData != null) {
BeanUtils.copyProperties(realTimeData, realTimeMonitorData);
}
return realTimeMonitorData;
}
/**
* 处理实时监控数据逻辑
* @param message
*/
private void realtimeDataLogic(byte[] message) {
log.info("RabbitMQ处理实时监控数据, threadName:{}, message:{}", Thread.currentThread().getName(), BytesUtil.binary(message, 16));
if (DateUtils.getNowDate().before(DateUtils.parseDate("2025-07-06"))) {
return;
}
Data0x13 realTimeData = new Data0x13(message);
String pileSn = realTimeData.getPileSn();
String connectorCode = realTimeData.getConnectorCode();
String connectorStatus = realTimeData.getConnectorStatus();
String transactionCode = realTimeData.getTransactionCode();
String isChargerPluggedIn = realTimeData.getPutGunType();
String faultReason = realTimeData.getFaultReason();
String pileConnectorCode = pileSn + connectorCode;
// 公共方法修改状态
pileBasicInfoService.updateStatus("0x13", pileSn, connectorCode, connectorStatus, isChargerPluggedIn);
// 01表示故障
if (StringUtils.equals(connectorStatus, PileConnectorStatusEnum.FAULT.getValue())) {
// 故障原因存入缓存
String redisKey = CacheConstants.PILE_HARDWARE_FAULT + pileConnectorCode;
redisCache.setCacheObject(redisKey, faultReason, 5, TimeUnit.MINUTES);
}
RealTimeMonitorData realTimeMonitorData = transformRealTimeMonitorData(realTimeData);
// 03表示充电中
if (StringUtils.equals(connectorStatus, PileConnectorStatusEnum.OCCUPIED_CHARGING.getValue())) {
// 默认保存到redis
boolean saveRedisFlag = true;
// 查询数据库中该订单当前信息
OrderBasicInfo orderInfo = orderBasicInfoService.getOrderInfoByTransactionCode(transactionCode);
if (Objects.nonNull(orderInfo)) {
if (StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.ORDER_COMPLETE.getValue())
|| StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.STAY_SETTLEMENT.getValue())) {
// 在订单状态为 订单完成或待结算,不保存
saveRedisFlag = false;
}
boolean updateFlag = false;
if (StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.NOT_START.getValue())
|| StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.ABNORMAL.getValue())
|| StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.STAY_SETTLEMENT.getValue())) {
updateFlag = true;
// 如果是未启动状态或者异常状态, 修改这个订单状态为充电中 2023年7月7日新增 如果是待结算状态,也改为充电中
orderInfo.setOrderStatus(OrderStatusEnum.IN_THE_CHARGING.getValue());
}
if (StringUtils.equals(orderInfo.getPayStatus(), OrderPayStatusEnum.unpaid.getValue())) {
// 如果发现该订单的支付状态为 0-待支付,将该订单支付状态改为 1-支付完成
orderInfo.setPayStatus(OrderPayStatusEnum.paid.getValue());
}
// 如果原来没有开始充电时间就保存当前时间为开始充电时间
if (orderInfo.getChargeStartTime() == null) {
updateFlag = true;
orderInfo.setChargeStartTime(new Date());
}
if (updateFlag) {
orderBasicInfoService.updateOrderBasicInfo(orderInfo);
}
}
// 充电时保存实时数据到redis
if (saveRedisFlag) {
pileBasicInfoService.saveRealTimeMonitorData2Redis(realTimeMonitorData);
}
}
// 异步推送第三方平台实时数据
CompletableFuture.runAsync(() -> {
try {
commonService.pushRealTimeInfo(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode);
} catch (Exception e) {
log.error("统一推送第三方平台实时数据 error,", e);
}
}, executor);
// 异步推送第三方平台实时数据V2
CompletableFuture.runAsync(() -> {
try {
commonService.pushRealTimeInfoV2(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode);
} catch (Exception e) {
log.error("统一推送第三方平台实时数据V2 error, ", e);
}
}, executor);
if (StringUtils.equals(connectorStatus, Constants.ONE)) {
// 故障
// 异步推送第三方平台告警信息
CompletableFuture.runAsync(() -> {
try {
commonService.commonPushAlarmInfo(pileConnectorCode, connectorStatus, realTimeMonitorData.getPutGunType());
} catch (Exception e) {
log.error("统一推送第三方平台告警信息 error, ", e);
}
}, executor);
}
}
}