From 16fc06e6664b06b14cd2c68dcf9bee14a7498851 Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Fri, 1 Nov 2024 15:19:47 +0800 Subject: [PATCH] rabbitMQ --- jsowell-common/pom.xml | 5 + .../common/config/mq/DirectRabbitConfig.java | 132 ++++++ .../common/config/mq/RabbitConfig.java | 176 ++++++++ .../common/constant/RabbitConstants.java | 43 ++ .../service/rabbitmq/PileRabbitListener.java | 402 ++++++++++++++++++ .../com/jsowell/mq/OrderRabbitListener.java | 25 ++ 6 files changed, 783 insertions(+) create mode 100644 jsowell-common/src/main/java/com/jsowell/common/config/mq/DirectRabbitConfig.java create mode 100644 jsowell-common/src/main/java/com/jsowell/common/config/mq/RabbitConfig.java create mode 100644 jsowell-common/src/main/java/com/jsowell/common/constant/RabbitConstants.java create mode 100644 jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java create mode 100644 jsowell-pile/src/main/java/com/jsowell/mq/OrderRabbitListener.java diff --git a/jsowell-common/pom.xml b/jsowell-common/pom.xml index 4be94b3de..785b492ad 100644 --- a/jsowell-common/pom.xml +++ b/jsowell-common/pom.xml @@ -223,6 +223,11 @@ protostuff-runtime + + org.springframework.boot + spring-boot-starter-amqp + + diff --git a/jsowell-common/src/main/java/com/jsowell/common/config/mq/DirectRabbitConfig.java b/jsowell-common/src/main/java/com/jsowell/common/config/mq/DirectRabbitConfig.java new file mode 100644 index 000000000..a865855ee --- /dev/null +++ b/jsowell-common/src/main/java/com/jsowell/common/config/mq/DirectRabbitConfig.java @@ -0,0 +1,132 @@ +package com.jsowell.common.config.mq; + +import com.jsowell.common.constant.RabbitConstants; +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class DirectRabbitConfig { + + // 定义交换机 + @Bean + public DirectExchange exchange() { + return new DirectExchange(RabbitConstants.YKC_EXCHANGE_NAME); + } + + @Bean + public Queue chargeOrderDataQueue() { + return new Queue(RabbitConstants.QUEUE_CHARGE_ORDER_DATA); + } + + @Bean + public Binding bindChargeOrderData() { + return BindingBuilder.bind(chargeOrderDataQueue()).to(exchange()).with(RabbitConstants.QUEUE_CHARGE_ORDER_DATA); + } + + // 定义队列 + // @Bean + // public Queue pileLoginQueue() { + // return new Queue(RabbitConstants.QUEUE_PILE_LOGIN); + // } + // + // @Bean + // public Queue heartBeatQueue() { + // return new Queue(RabbitConstants.QUEUE_HEART_BEAT); + // } + // + // @Bean + // public Queue realtimeDataQueue() { + // return new Queue(RabbitConstants.QUEUE_REALTIME_DATA); + // } + // + // @Bean + // public Queue priceSenderQueue() { + // return new Queue(RabbitConstants.QUEUE_PRICE_SENDER); + // } + // + // @Bean + // public Queue stationPriceSenderQueue() { + // return new Queue(RabbitConstants.QUEUE_STATION_PRICE_SENDER); + // } + // + // @Bean + // public Queue connectorStatusNotifyQueue() { + // return new Queue(RabbitConstants.QUEUE_CONNECTOR_STATUS_NOTIFY); + // } + // + // @Bean + // public Queue chargingStatusNotifyQueue() { + // return new Queue(RabbitConstants.QUEUE_CHARGING_STATUS_NOTIFY); + // } + // + // + // + // @Bean + // public Queue upStationStatusQueue() { + // return new Queue(RabbitConstants.QUEUE_UP_STATION_STATUS); + // } + // + // @Bean + // public Queue upEquipChargeStatusQueue() { + // return new Queue(RabbitConstants.QUEUE_UP_EQUIP_CHARGE_STATUS); + // } + // + // @Bean + // public Queue upChargeOrderInfoQueue() { + // return new Queue(RabbitConstants.QUEUE_UP_CHARGE_ORDER_INFO); + // } + + // 绑定队列到交换机 + // @Bean + // public Binding bindPileLogin() { + // return BindingBuilder.bind(pileLoginQueue()).to(exchange()).with(RabbitConstants.QUEUE_PILE_LOGIN); + // } + // + // @Bean + // public Binding bindHeartBeat() { + // return BindingBuilder.bind(heartBeatQueue()).to(exchange()).with(RabbitConstants.QUEUE_HEART_BEAT); + // } + // + // @Bean + // public Binding bindRealtimeData() { + // return BindingBuilder.bind(realtimeDataQueue()).to(exchange()).with(RabbitConstants.QUEUE_REALTIME_DATA); + // } + // + // @Bean + // public Binding bindPriceSender() { + // return BindingBuilder.bind(priceSenderQueue()).to(exchange()).with(RabbitConstants.QUEUE_PRICE_SENDER); + // } + // + // @Bean + // public Binding bindStationPriceSender() { + // return BindingBuilder.bind(stationPriceSenderQueue()).to(exchange()).with(RabbitConstants.QUEUE_STATION_PRICE_SENDER); + // } + // + // @Bean + // public Binding bindConnectorStatusNotify() { + // return BindingBuilder.bind(connectorStatusNotifyQueue()).to(exchange()).with(RabbitConstants.QUEUE_CONNECTOR_STATUS_NOTIFY); + // } + // + // @Bean + // public Binding bindChargingStatusNotify() { + // return BindingBuilder.bind(chargingStatusNotifyQueue()).to(exchange()).with(RabbitConstants.QUEUE_CHARGING_STATUS_NOTIFY); + // } + + + + // @Bean + // public Binding bindUpStationStatus() { + // return BindingBuilder.bind(upStationStatusQueue()).to(exchange()).with(RabbitConstants.QUEUE_UP_STATION_STATUS); + // } + // + // @Bean + // public Binding bindUpEquipChargeStatus() { + // return BindingBuilder.bind(upEquipChargeStatusQueue()).to(exchange()).with(RabbitConstants.QUEUE_UP_EQUIP_CHARGE_STATUS); + // } + // + // @Bean + // public Binding bindUpChargeOrderInfo() { + // return BindingBuilder.bind(upChargeOrderInfoQueue()).to(exchange()).with(RabbitConstants.QUEUE_UP_CHARGE_ORDER_INFO); + // } +} diff --git a/jsowell-common/src/main/java/com/jsowell/common/config/mq/RabbitConfig.java b/jsowell-common/src/main/java/com/jsowell/common/config/mq/RabbitConfig.java new file mode 100644 index 000000000..292bc7daa --- /dev/null +++ b/jsowell-common/src/main/java/com/jsowell/common/config/mq/RabbitConfig.java @@ -0,0 +1,176 @@ +package com.jsowell.common.config.mq; + +import org.springframework.amqp.core.AcknowledgeMode; +import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; + +// 常用的三个配置如下 +// 1---设置手动应答(acknowledge-mode: manual) +// 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调 +// publisher-confirm-type: correlated +// #保证交换机能把消息推送到队列中 +// publisher-returns: true +// template: +// #以下是rabbitmqTemplate配置 +// mandatory: true) +// 3---设置重试 +@Configuration +public class RabbitConfig { + + @Autowired + private ConnectionFactory rabbitConnectionFactory; + + //@Bean 缓存连接池 + // public CachingConnectionFactory rabbitConnectionFactory + + @Autowired + private RabbitProperties properties; + + // 这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉 + // 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称 +// @Bean +// public ConnectionFactory connectionFactory() throws Exception { +// //创建工厂类 +// CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory(); +// //用户名 +// cachingConnectionFactory.setUsername("gust"); +// //密码 +// cachingConnectionFactory.setPassword("gust"); +// //rabbitMQ地址 +// cachingConnectionFactory.setHost("127.0.0.1"); +// //rabbitMQ端口 +// cachingConnectionFactory.setPort(Integer.parseInt("5672")); +// +// //设置发布消息后回调 +// cachingConnectionFactory.setPublisherReturns(true); +// //设置发布后确认类型,此处确认类型为交互 +// cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); +// +// cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); +// return cachingConnectionFactory; +// } + + + // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称 + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { + SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); + containerFactory.setConnectionFactory(rabbitConnectionFactory); + + // 并发消费者数量 + containerFactory.setConcurrentConsumers(1); + containerFactory.setMaxConcurrentConsumers(20); + // 预加载消息数量 -- QOS + containerFactory.setPrefetchCount(1); + // 应答模式(此处设置为手动) + containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); + // 消息序列化方式 + containerFactory.setMessageConverter(new Jackson2JsonMessageConverter()); + // 设置通知调用链 (这里设置的是重试机制的调用链) + containerFactory.setAdviceChain( + RetryInterceptorBuilder + .stateless() + .recoverer(new RejectAndDontRequeueRecoverer()) + .retryOperations(rabbitRetryTemplate()) + .build() + ); + return containerFactory; + } + + // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称 + @Bean + public RabbitTemplate rabbitTemplate(){ + RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory); + // 默认是用jdk序列化 + // 数据转换为json存入消息队列,方便可视化界面查看消息数据 + rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); + // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 + rabbitTemplate.setMandatory(true); + // 此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链 + rabbitTemplate.setRetryTemplate(rabbitRetryTemplate()); + // CorrelationData correlationData, boolean b, String s + rabbitTemplate.setConfirmCallback( + (correlationData, b, s) -> { + System.out.println("ConfirmCallback "+"相关数据:"+ correlationData); + System.out.println("ConfirmCallback "+"确认情况:"+b); + System.out.println("ConfirmCallback "+"原因:"+s); + }); + // Message message, int i, String s, String s1, String s2 + rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { + System.out.println("ReturnCallback: "+"消息:"+message); + System.out.println("ReturnCallback: "+"回应码:"+i); + System.out.println("ReturnCallback: "+"回应消息:"+s); + System.out.println("ReturnCallback: "+"交换机:"+s1); + System.out.println("ReturnCallback: "+"路由键:"+s2); + }); + + return rabbitTemplate; + } + + // 重试的Template + @Bean + public RetryTemplate rabbitRetryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + // 设置监听 调用重试处理过程 + retryTemplate.registerListener(new RetryListener() { + @Override + public boolean open(RetryContext retryContext, RetryCallback retryCallback) { + // 执行之前调用 (返回false时会终止执行) + return true; + } + + @Override + public void close(RetryContext retryContext, RetryCallback retryCallback, Throwable throwable) { + // 重试结束的时候调用 (最后一次重试 ) + System.out.println("---------------最后一次调用"); + + return ; + } + @Override + public void onError(RetryContext retryContext, RetryCallback retryCallback, Throwable throwable) { + // 异常 都会调用 + System.err.println("-----第{}次调用"+retryContext.getRetryCount()); + } + }); + retryTemplate.setBackOffPolicy(backOffPolicyByProperties()); + retryTemplate.setRetryPolicy(retryPolicyByProperties()); + return retryTemplate; + } + + @Bean + public ExponentialBackOffPolicy backOffPolicyByProperties() { + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds(); + long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds(); + double multiplier = properties.getListener().getSimple().getRetry().getMultiplier(); + // 重试间隔 + backOffPolicy.setInitialInterval(initialInterval * 1000); + // 重试最大间隔 + backOffPolicy.setMaxInterval(maxInterval * 1000); + // 重试间隔乘法策略 + backOffPolicy.setMultiplier(multiplier); + return backOffPolicy; + } + + @Bean + public SimpleRetryPolicy retryPolicyByProperties() { + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts(); + retryPolicy.setMaxAttempts(maxAttempts); + return retryPolicy; + } +} diff --git a/jsowell-common/src/main/java/com/jsowell/common/constant/RabbitConstants.java b/jsowell-common/src/main/java/com/jsowell/common/constant/RabbitConstants.java new file mode 100644 index 000000000..21d75a4e9 --- /dev/null +++ b/jsowell-common/src/main/java/com/jsowell/common/constant/RabbitConstants.java @@ -0,0 +1,43 @@ +package com.jsowell.common.constant; + +/** + * RabbitMQ常量 + */ +public class RabbitConstants { + // 交换机名称 + public static final String YKC_EXCHANGE_NAME = "ykcChargingExchange"; + + // pileLogin: 充电桩登录(队列) + public static final String QUEUE_PILE_LOGIN = "ykc.pileLogin-topic.device-group"; + + // heartBeat: 桩心跳消费(队列) + public static final String QUEUE_HEART_BEAT = "ykc.heartBeat-topic.device-group"; + + // realtimeData: 桩实时数据消费(队列) + public static final String QUEUE_REALTIME_DATA = "ykc.realtimeData-topic.device-group"; + + // priceSender: 桩价格下发(队列) + public static final String QUEUE_PRICE_SENDER = "ykc.priceSender-topic.device-group"; + + // stationPriceSender: 充电基准价格推送至用户运营商(队列) + public static final String QUEUE_STATION_PRICE_SENDER = "ykc.stationPriceSender-topic.device-group"; + + // connectorStatusNotify: 充电桩状态推送至用户运营商(队列) + public static final String QUEUE_CONNECTOR_STATUS_NOTIFY = "ykc.connectorStatusNotify-topic.device-group"; + + // chargingStatusNotify: 充电订单状态数据推送至用户运营商(队列) + public static final String QUEUE_CHARGING_STATUS_NOTIFY = "ykc.chargingStatusNotify-topic.device-group"; + + // chargeOrderData: 充电结算订单推送至用户运营商(队列) + public static final String QUEUE_CHARGE_ORDER_DATA = "ykc.chargeOrderData-topic.device-group"; + + // upStationStatus: 消费充电桩状态推送 + public static final String QUEUE_UP_STATION_STATUS = "ykc.upStationStatus-topic.userplat-group"; + + // upEquipChargeStatus: 消费充电订单状态数据 + public static final String QUEUE_UP_EQUIP_CHARGE_STATUS = "ykc.upEquipChargeStatus-topic.userplat-group"; + + // upChargeOrderInfo: 消费充电结算订单 + public static final String QUEUE_UP_CHARGE_ORDER_INFO = "ykc.upChargeOrderInfo-topic.userplat-group"; + +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java new file mode 100644 index 000000000..1f740c58b --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java @@ -0,0 +1,402 @@ +package com.jsowell.netty.service.rabbitmq; + +import com.alibaba.fastjson2.JSON; +import com.google.common.primitives.Bytes; +import com.jsowell.common.constant.CacheConstants; +import com.jsowell.common.constant.Constants; +import com.jsowell.common.core.domain.ykc.LoginRequestData; +import com.jsowell.common.core.domain.ykc.RealTimeMonitorData; +import com.jsowell.common.core.domain.ykc.YKCDataProtocol; +import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; +import com.jsowell.common.core.domain.ykc.device2platform.Data0x01; +import com.jsowell.common.core.domain.ykc.device2platform.Data0x03; +import com.jsowell.common.core.domain.ykc.device2platform.Data0x13; +import com.jsowell.common.core.redis.RedisCache; +import com.jsowell.common.enums.ykc.OrderPayStatusEnum; +import com.jsowell.common.enums.ykc.OrderStatusEnum; +import com.jsowell.common.enums.ykc.PileChannelEntity; +import com.jsowell.common.enums.ykc.PileConnectorStatusEnum; +import com.jsowell.common.util.*; +import com.jsowell.common.util.bean.SerializationUtil; +import com.jsowell.common.util.spring.SpringUtils; +import com.jsowell.pile.domain.OrderBasicInfo; +import com.jsowell.pile.domain.PileBasicInfo; +import com.jsowell.pile.domain.ykcCommond.IssueQRCodeCommand; +import com.jsowell.pile.domain.ykcCommond.ProofreadTimeCommand; +import com.jsowell.pile.service.OrderBasicInfoService; +import com.jsowell.pile.service.PileBasicInfoService; +import com.jsowell.pile.service.PileMsgRecordService; +import com.jsowell.pile.service.YKCPushCommandService; +import com.jsowell.thirdparty.common.CommonService; +import com.rabbitmq.client.Channel; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.util.Date; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +public class PileRabbitListener { + + @Autowired + private PileBasicInfoService pileBasicInfoService; + + @Autowired + private YKCPushCommandService ykcPushCommandService; + + @Autowired + private PileMsgRecordService pileMsgRecordService; + + @Autowired + private OrderBasicInfoService orderBasicInfoService; + + @Autowired + private CommonService commonService; + + @Autowired + private RedisCache redisCache; + + + // 引入线程池 + private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); + + // ========================= 消费消息 ========================== // + // @RabbitListener(queues = "ykc.pileLogin-topic.device-group") + // public void testRabbitMQMessage(String msg) { + // CompletableFuture.runAsync(() -> { + // log.info("RabbitMQ处理请求start, threadName:{}, message:{}", Thread.currentThread().getName(), msg); + // try { + // // 模拟处理耗时 + // Thread.sleep(500); + // } catch (InterruptedException e) { + // throw new RuntimeException(e); + // } + // log.info("RabbitMQ处理请求end, threadName:{}, message:{}", Thread.currentThread().getName(), msg); + // }, executor); + // } + + /** + * 多线程消费登录请求消息 + * @param msg + */ + // @RabbitListener(queues = RabbitConstants.QUEUE_PILE_LOGIN) + public void receiveLoginMessage(byte[] msg, Channel channel, Message message) { + CompletableFuture.runAsync(() -> { + loginLogic(msg); + try { + //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, executor); + } + + /** + * 登录请求逻辑 + * @param message + */ + private void loginLogic(byte[] message) { + log.info("RabbitMQ处理登录请求, threadName:{}, message:{}", Thread.currentThread().getName(), message); + // 处理登录请求 + YKCDataProtocol ykcDataProtocol = new YKCDataProtocol(message); + + Data0x01 loginData = new Data0x01(message); + String pileSn = loginData.getPileSn(); + String pileType = loginData.getPileType(); + String connectorNum = loginData.getConnectorNum(); + String communicationVersion = loginData.getCommunicationVersion(); + String programVersion = loginData.getProgramVersion(); + String internetConnection = loginData.getInternetConnection(); + String iccid = loginData.getIccid(); + String business = loginData.getBusiness(); + + ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn); + + if (YKCUtils.verifyTheDuplicateRequest(ykcDataProtocol, ctx)) { + return; + } + + LoginRequestData loginRequestData = LoginRequestData.builder() + .pileSn(pileSn) + .pileType(pileType) + .connectorNum(connectorNum) + .communicationVersion(communicationVersion) + .programVersion(programVersion) + .internetConnection(internetConnection) + .iccid(iccid) + .business(business) + .build(); + + // 结果(默认 0x01:登录失败) + byte[] flag = Constants.oneByteArray; + + // 通过桩编码SN查询数据库,如果有数据,则登录成功,否则登录失败 + PileBasicInfo pileBasicInfo = null; + try { + pileBasicInfo = pileBasicInfoService.selectPileBasicInfoBySN(pileSn); + } catch (Exception e) { + log.error("selectPileBasicInfoBySN发生异常", e); + } + + if (pileBasicInfo != null) { + flag = Constants.zeroByteArray; + + // 异步修改充电桩状态 + CompletableFuture.runAsync(() -> { + try { + // 更改桩和该桩下的枪口状态分别为 在线、空闲 公共方法修改状态 + pileBasicInfoService.updateStatus(BytesUtil.bcd2Str(ykcDataProtocol.getFrameType()), pileSn, null, null, null); + } catch (Exception e) { + e.printStackTrace(); + } + }, executor); + + // 异步发送对时指令 + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + // 对时 + ProofreadTimeCommand command = ProofreadTimeCommand.builder().pileSn(pileSn).build(); + ykcPushCommandService.pushProofreadTimeCommand(command); + }, executor); + + // log.info("下面进行下发二维码 pileSn:{}, thread:{}", pileSn, Thread.currentThread().getName()); + // 异步发送下发二维码指令 + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(600); + } catch (InterruptedException e) { + e.printStackTrace(); + } + // 下发二维码 + IssueQRCodeCommand issueQRCodeCommand = IssueQRCodeCommand.builder().pileSn(pileSn).build(); + ykcPushCommandService.pushIssueQRCodeCommand(issueQRCodeCommand); + }, executor); + + if (StringUtils.equals("00", internetConnection)) { + CompletableFuture.runAsync(() -> { + // 充电桩使用的sim卡,把信息存库 + try { + pileBasicInfoService.updatePileSimInfo(pileSn, iccid); + // pileBasicInfoService.updatePileSimInfoV2(pileSn, iccid); + } catch (Exception e) { + log.error("更新充电桩sim卡信息失败pileSn:{}, iccid:{}", pileSn, iccid, e); + } + }, executor); + } + } + + // 异步保持登录报文 + CompletableFuture.runAsync(() -> { + // 保存报文 没有登录认证通过还要不要保存报文? + try { + String jsonMsg = JSON.toJSONString(loginRequestData); + pileMsgRecordService.save(pileSn, pileSn, YKCUtils.frameType2Str(YKCFrameTypeCode.LOGIN_CODE.getBytes()), jsonMsg, ykcDataProtocol.getHEXString()); + } catch (Exception e) { + log.error("保存报文失败pileSn:{}", pileSn, e); + } + }, executor); + + // 消息体 + byte[] messageBody = Bytes.concat(YKCUtils.getPileSnBytes(pileSn), flag); + byte[] result = YKCUtils.getResult(ykcDataProtocol, messageBody); + log.info("RabbitMQ登录Result:{}", BytesUtil.binary(result, 16)); + ctx.writeAndFlush(Unpooled.copiedBuffer(result)); + } + + /** + * 多线程消费心跳消息 + * @param message + */ + // @RabbitListener(queues = RabbitConstants.QUEUE_HEART_BEAT) + public void receiveHeartBeat(byte[] msg, Channel channel, Message message) throws IOException { + // CompletableFuture.runAsync(() -> { + // }, executor); + heartBeatLogic(msg); + //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + + /** + * 心跳逻辑 + * @param message + */ + private void heartBeatLogic(byte[] message) { + Data0x03 heartbeatData = SerializationUtil.deserialize(Data0x03.class, message); + log.info("RabbitMQ处理心跳消息, threadName:{}, message:{}", Thread.currentThread().getName(), JSON.toJSONString(heartbeatData)); + // 处理登录请求 + String pileSn = heartbeatData.getPileSn(); + String connectorCode = heartbeatData.getConnectorCode(); + String connectorStatus = heartbeatData.getConnectorStatus(); + // ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn); + ChannelHandlerContext ctx = ChannelManagerUtil.getChannel(heartbeatData.getChannelId()); + + // 保存时间 + YKCUtils.saveLastTimeAndCheckChannel(heartbeatData.getPileSn(), ctx); + + // 公共方法修改状态 + try { + pileBasicInfoService.updateStatus("0x03", pileSn, connectorCode, connectorStatus, null); + } catch (Exception e) { + log.error("公共方法修改状态error", e); + } + + // 心跳应答(置0) + byte[] flag = Constants.zeroByteArray; + // 消息体 + byte[] messageBody = Bytes.concat(BytesUtil.hexStringToByteArray(pileSn), BytesUtil.hexStringToByteArray(connectorCode), flag); + byte[] result = YKCUtils.getResult(heartbeatData, messageBody); + log.info("RabbitMQ心跳应答, result:{}", BytesUtil.printHexBinary(result)); + ctx.channel().writeAndFlush(ctx.channel().alloc().buffer().writeBytes(result)); + } + + public static void main(String[] args) { + System.out.println(DateUtils.getNowDate().before(DateUtils.parseDate("2025-07-06"))); + } + + /** + * 多线程消费实时监控数据消息 + * @param msg + */ + // @RabbitListener(queues = RabbitConstants.QUEUE_REALTIME_DATA) + public void receiveRealtimeData(byte[] msg, Channel channel, Message message) { + CompletableFuture.runAsync(() -> { + realtimeDataLogic(msg); + try { + //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, executor); + } + + private RealTimeMonitorData transformRealTimeMonitorData(Data0x13 realTimeData) { + RealTimeMonitorData realTimeMonitorData = new RealTimeMonitorData(); + if (realTimeData != null) { + BeanUtils.copyProperties(realTimeData, realTimeMonitorData); + } + return realTimeMonitorData; + } + + /** + * 处理实时监控数据逻辑 + * @param message + */ + private void realtimeDataLogic(byte[] message) { + log.info("RabbitMQ处理实时监控数据, threadName:{}, message:{}", Thread.currentThread().getName(), BytesUtil.binary(message, 16)); + if (DateUtils.getNowDate().before(DateUtils.parseDate("2025-07-06"))) { + return; + } + Data0x13 realTimeData = new Data0x13(message); + String pileSn = realTimeData.getPileSn(); + String connectorCode = realTimeData.getConnectorCode(); + String connectorStatus = realTimeData.getConnectorStatus(); + String transactionCode = realTimeData.getTransactionCode(); + String isChargerPluggedIn = realTimeData.getPutGunType(); + String faultReason = realTimeData.getFaultReason(); + String pileConnectorCode = pileSn + connectorCode; + + // 公共方法修改状态 + pileBasicInfoService.updateStatus("0x13", pileSn, connectorCode, connectorStatus, isChargerPluggedIn); + + // 01表示故障 + if (StringUtils.equals(connectorStatus, PileConnectorStatusEnum.FAULT.getValue())) { + // 故障原因存入缓存 + String redisKey = CacheConstants.PILE_HARDWARE_FAULT + pileConnectorCode; + redisCache.setCacheObject(redisKey, faultReason, 5, TimeUnit.MINUTES); + } + + RealTimeMonitorData realTimeMonitorData = transformRealTimeMonitorData(realTimeData); + + // 03表示充电中 + if (StringUtils.equals(connectorStatus, PileConnectorStatusEnum.OCCUPIED_CHARGING.getValue())) { + // 默认保存到redis + boolean saveRedisFlag = true; + + // 查询数据库中该订单当前信息 + OrderBasicInfo orderInfo = orderBasicInfoService.getOrderInfoByTransactionCode(transactionCode); + if (Objects.nonNull(orderInfo)) { + if (StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.ORDER_COMPLETE.getValue()) + || StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.STAY_SETTLEMENT.getValue())) { + // 在订单状态为 订单完成或待结算,不保存 + saveRedisFlag = false; + } + + boolean updateFlag = false; + if (StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.NOT_START.getValue()) + || StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.ABNORMAL.getValue()) + || StringUtils.equals(orderInfo.getOrderStatus(), OrderStatusEnum.STAY_SETTLEMENT.getValue())) { + updateFlag = true; + // 如果是未启动状态或者异常状态, 修改这个订单状态为充电中 2023年7月7日新增 如果是待结算状态,也改为充电中 + orderInfo.setOrderStatus(OrderStatusEnum.IN_THE_CHARGING.getValue()); + } + + if (StringUtils.equals(orderInfo.getPayStatus(), OrderPayStatusEnum.unpaid.getValue())) { + // 如果发现该订单的支付状态为 0-待支付,将该订单支付状态改为 1-支付完成 + orderInfo.setPayStatus(OrderPayStatusEnum.paid.getValue()); + } + + // 如果原来没有开始充电时间就保存当前时间为开始充电时间 + if (orderInfo.getChargeStartTime() == null) { + updateFlag = true; + orderInfo.setChargeStartTime(new Date()); + } + + if (updateFlag) { + orderBasicInfoService.updateOrderBasicInfo(orderInfo); + } + } + + // 充电时保存实时数据到redis + if (saveRedisFlag) { + pileBasicInfoService.saveRealTimeMonitorData2Redis(realTimeMonitorData); + } + } + + // 异步推送第三方平台实时数据 + CompletableFuture.runAsync(() -> { + try { + commonService.pushRealTimeInfo(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); + } catch (Exception e) { + log.error("统一推送第三方平台实时数据 error,", e); + } + }, executor); + + // 异步推送第三方平台实时数据V2 + CompletableFuture.runAsync(() -> { + try { + commonService.pushRealTimeInfoV2(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); + } catch (Exception e) { + log.error("统一推送第三方平台实时数据V2 error, ", e); + } + }, executor); + + if (StringUtils.equals(connectorStatus, Constants.ONE)) { + // 故障 + // 异步推送第三方平台告警信息 + CompletableFuture.runAsync(() -> { + try { + commonService.commonPushAlarmInfo(pileConnectorCode, connectorStatus, realTimeMonitorData.getPutGunType()); + } catch (Exception e) { + log.error("统一推送第三方平台告警信息 error, ", e); + } + }, executor); + } + } + +} diff --git a/jsowell-pile/src/main/java/com/jsowell/mq/OrderRabbitListener.java b/jsowell-pile/src/main/java/com/jsowell/mq/OrderRabbitListener.java new file mode 100644 index 000000000..2fbf581f6 --- /dev/null +++ b/jsowell-pile/src/main/java/com/jsowell/mq/OrderRabbitListener.java @@ -0,0 +1,25 @@ +package com.jsowell.mq; + +import com.jsowell.pile.dto.AfterSettleOrderDTO; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.stereotype.Service; + +import java.io.IOException; + +@Slf4j +@Service +public class OrderRabbitListener { + + /** + * 多线程消费请求消息 + * @param message + */ + // @RabbitListener(queues = RabbitConstants.QUEUE_CHARGE_ORDER_DATA) + public void receiveChargeOrderData(AfterSettleOrderDTO afterSettleOrderDTO, Channel channel, Message message) throws IOException { + log.info("接收到订单结算数据:{}", afterSettleOrderDTO); + //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } +}