diff --git a/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java b/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java index 0cf63bfd7..4c8874625 100644 --- a/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java +++ b/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java @@ -17,6 +17,7 @@ import com.jsowell.adapay.vo.PaymentInfo; import com.jsowell.common.YouDianUtils; import com.jsowell.common.constant.CacheConstants; import com.jsowell.common.constant.Constants; +import com.jsowell.common.constant.mq.ThirdPartyRabbitConstants; import com.jsowell.common.core.domain.vo.AuthorizedDeptVO; import com.jsowell.common.core.domain.ykc.RealTimeMonitorData; import com.jsowell.common.core.domain.ykc.TransactionRecordsData; @@ -58,6 +59,7 @@ import com.jsowell.wxpay.response.WechatPayNotifyParameter; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; @@ -146,6 +148,9 @@ public class OrderService { @Autowired private PersonalChargingRecordService personalChargingRecordService; + @Autowired + private RabbitTemplate rabbitTemplate; + // 引入线程池 private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); @@ -805,13 +810,24 @@ public class OrderService { log.info("人工结算订单-end orderCode:{}", dto.getOrderCode()); // 异步推送第三方平台订单信息 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.commonPushOrderInfo(orderBasicInfo); +// } catch (Exception e) { +// log.error("人工结算订单 推送第三方平台订单信息error, orderCode:{}", orderBasicInfo.getOrderCode(), e); +// } +// }, thirdpartyTaskExecutor); + + // TODO 测试mq + // 向mq中发送实时数据消息,给第三方服务消费 CompletableFuture.runAsync(() -> { try { - commonService.commonPushOrderInfo(orderBasicInfo); + rabbitTemplate.convertAndSend(ThirdPartyRabbitConstants.WCC_THIRDPARTY_NAME , ThirdPartyRabbitConstants.QUEUE_CHARGE_ORDER_PUSH ,orderBasicInfo ); } catch (Exception e) { - log.error("人工结算订单 推送第三方平台订单信息error, orderCode:{}", orderBasicInfo.getOrderCode(), e); + log.error("向mq中发送实时数据消息供第三方服务消费 error, " , e); } }, thirdpartyTaskExecutor); + return true; } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java index 84164030f..aa48bf15d 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java @@ -109,27 +109,27 @@ public class RemoteStartChargingRequestHandler extends AbstractYkcHandler { orderBasicInfoService.chargingPileStartedSuccessfully(transactionCode); } // 异步推送第三方平台 - CompletableFuture.runAsync(() -> { - OrderBasicInfo orderInfo = orderBasicInfoService.getOrderInfoByTransactionCode(transactionCode); - if (orderInfo == null) { - return; - } - try { - // 启动结果回复 - commonService.commonPushStartChargeResultV2(orderInfo); - } catch (Exception e) { - e.printStackTrace(); - } - // 启动失败, 推送第三方订单信息 - if (StringUtils.equals(startResult, Constants.DOUBLE_ZERO)) { - try { - Thread.sleep(500); - commonService.commonPushOrderInfoV2(orderInfo); - } catch (Exception e) { - log.error("统一推送第三方平台订单信息error, ", e); - } - } - }, thirdpartyTaskExecutor); +// CompletableFuture.runAsync(() -> { +// OrderBasicInfo orderInfo = orderBasicInfoService.getOrderInfoByTransactionCode(transactionCode); +// if (orderInfo == null) { +// return; +// } +// try { +// // 启动结果回复 +// commonService.commonPushStartChargeResultV2(orderInfo); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// // 启动失败, 推送第三方订单信息 +// if (StringUtils.equals(startResult, Constants.DOUBLE_ZERO)) { +// try { +// Thread.sleep(500); +// commonService.commonPushOrderInfoV2(orderInfo); +// } catch (Exception e) { +// log.error("统一推送第三方平台订单信息error, ", e); +// } +// } +// }, thirdpartyTaskExecutor); // TODO 测试mq // 异步发送mq diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TransactionRecordsRequestHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TransactionRecordsRequestHandler.java index 775970279..95c865c86 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TransactionRecordsRequestHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TransactionRecordsRequestHandler.java @@ -673,36 +673,24 @@ public class TransactionRecordsRequestHandler extends AbstractYkcHandler { OrderBasicInfo finalOrderBasicInfo = orderBasicInfo; - // TODO 异步推送第三方平台订单信息 - CompletableFuture.runAsync(() -> { - try { - commonService.commonPushOrderInfo(finalOrderBasicInfo); - } catch (Exception e) { - log.error("推送第三方平台订单信息error, ", e); - } - }, thirdpartyTaskExecutor); +// // TODO 异步推送第三方平台订单信息 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.commonPushOrderInfo(finalOrderBasicInfo); +// } catch (Exception e) { +// log.error("推送第三方平台订单信息error, ", e); +// } +// }, thirdpartyTaskExecutor); +// +// // TODO 异步推送第三方平台订单信息V2 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.commonPushOrderInfoV2(finalOrderBasicInfo); +// } catch (Exception e) { +// log.error("推送第三方平台订单信息error, ", e); +// } +// }, thirdpartyTaskExecutor); - // TODO 异步推送第三方平台订单信息V2 - CompletableFuture.runAsync(() -> { - try { - commonService.commonPushOrderInfoV2(finalOrderBasicInfo); - } catch (Exception e) { - log.error("推送第三方平台订单信息error, ", e); - } - }, thirdpartyTaskExecutor); - - // 异步推送充电订单算法平台 - CompletableFuture.runAsync(() -> { - try { - String result = chargeAlgorithmService.pushOrderInfo(finalOrderBasicInfo.getOrderCode()); - log.info("异步推送充电订单算法平台 result:{}", result); - } catch (Exception e) { - log.error("异步推送充电订单算法平台 error, ", e); - } - }, thirdpartyTaskExecutor); - - - // TODO 测试mq // 异步推送充电订单算法平台 CompletableFuture.runAsync(() -> { try { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/UploadRealTimeMonitorHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/UploadRealTimeMonitorHandler.java index 008933a52..b98d2a9ea 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/UploadRealTimeMonitorHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/UploadRealTimeMonitorHandler.java @@ -361,26 +361,26 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler { } // 异步推送第三方平台实时数据V2 - CompletableFuture.runAsync(() -> { - try { - commonService.pushRealTimeInfoV2(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); -// log.info("统一推送第三方平台实时数据V2 success, pileSn:{}, connectorCode:{}, connectorStatus:{}, realTimeMonitorData:{}, transactionCode:{}", pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); - } catch (Exception e) { - log.error("统一推送第三方平台实时数据V2 error, ", e); - } - }, thirdpartyTaskExecutor); - - if (StringUtils.equals(connectorStatus, Constants.ONE)) { - // 故障 - // 异步推送第三方平台告警信息 - CompletableFuture.runAsync(() -> { - try { - commonService.commonPushAlarmInfo(pileConnectorCode, connectorStatus, realTimeMonitorData.getPutGunType()); - } catch (Exception e) { - log.error("统一推送第三方平台告警信息 error, ", e); - } - }, thirdpartyTaskExecutor); - } +// CompletableFuture.runAsync(() -> { +// try { +// commonService.pushRealTimeInfoV2(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); +//// log.info("统一推送第三方平台实时数据V2 success, pileSn:{}, connectorCode:{}, connectorStatus:{}, realTimeMonitorData:{}, transactionCode:{}", pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); +// } catch (Exception e) { +// log.error("统一推送第三方平台实时数据V2 error, ", e); +// } +// }, thirdpartyTaskExecutor); +// +// if (StringUtils.equals(connectorStatus, Constants.ONE)) { +// // 故障 +// // 异步推送第三方平台告警信息 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.commonPushAlarmInfo(pileConnectorCode, connectorStatus, realTimeMonitorData.getPutGunType()); +// } catch (Exception e) { +// log.error("统一推送第三方平台告警信息 error, ", e); +// } +// }, thirdpartyTaskExecutor); +// } // TODO 测试mq // 向mq中发送实时数据消息,给第三方服务消费 diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java index 94a827682..e41e123cd 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSON; import com.google.common.primitives.Bytes; import com.jsowell.common.constant.CacheConstants; import com.jsowell.common.constant.Constants; +import com.jsowell.common.constant.mq.ThirdPartyRabbitConstants; import com.jsowell.common.core.domain.ykc.LoginRequestData; import com.jsowell.common.core.domain.ykc.RealTimeMonitorData; import com.jsowell.common.core.domain.ykc.YKCDataProtocol; @@ -33,6 +34,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -69,6 +71,9 @@ public class PileRabbitListener { @Autowired private RedisCache redisCache; + @Autowired + private RabbitTemplate rabbitTemplate; + // 引入线程池 private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); @@ -374,35 +379,56 @@ public class PileRabbitListener { } } - // 异步推送第三方平台实时数据 +// // 异步推送第三方平台实时数据 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.pushRealTimeInfo(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); +// } catch (Exception e) { +// log.error("统一推送第三方平台实时数据 error,", e); +// } +// }, thirdpartyTaskExecutor); +// +// // 异步推送第三方平台实时数据V2 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.pushRealTimeInfoV2(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); +// } catch (Exception e) { +// log.error("统一推送第三方平台实时数据V2 error, ", e); +// } +// }, thirdpartyTaskExecutor); +// +// if (StringUtils.equals(connectorStatus, Constants.ONE)) { +// // 故障 +// // 异步推送第三方平台告警信息 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.commonPushAlarmInfo(pileConnectorCode, connectorStatus, realTimeMonitorData.getPutGunType()); +// } catch (Exception e) { +// log.error("统一推送第三方平台告警信息 error, ", e); +// } +// }, thirdpartyTaskExecutor); +// } + + // TODO 测试mq + // 向mq中发送实时数据消息,给第三方服务消费 CompletableFuture.runAsync(() -> { try { - commonService.pushRealTimeInfo(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); - } catch (Exception e) { - log.error("统一推送第三方平台实时数据 error,", e); + rabbitTemplate.convertAndSend(ThirdPartyRabbitConstants.WCC_THIRDPARTY_NAME,ThirdPartyRabbitConstants.QUEUE_REALTIME_DATA_PUSH, realTimeMonitorData); + } catch (Exception e){ + log.error("向mq中发送实时数据消息供第三方服务消费 error, ", e); } - }, thirdpartyTaskExecutor); - - // 异步推送第三方平台实时数据V2 - CompletableFuture.runAsync(() -> { - try { - commonService.pushRealTimeInfoV2(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); - } catch (Exception e) { - log.error("统一推送第三方平台实时数据V2 error, ", e); - } - }, thirdpartyTaskExecutor); - - if (StringUtils.equals(connectorStatus, Constants.ONE)) { - // 故障 - // 异步推送第三方平台告警信息 - CompletableFuture.runAsync(() -> { + if (StringUtils.equals(connectorStatus,Constants.ONE)){ + // 故障 + // 异步推送第三方平台告警信息 try { - commonService.commonPushAlarmInfo(pileConnectorCode, connectorStatus, realTimeMonitorData.getPutGunType()); + rabbitTemplate.convertAndSend(ThirdPartyRabbitConstants.WCC_THIRDPARTY_NAME,ThirdPartyRabbitConstants.QUEUE_ALARM_PUSH, realTimeMonitorData.getPutGunType()); } catch (Exception e) { log.error("统一推送第三方平台告警信息 error, ", e); } - }, thirdpartyTaskExecutor); - } + } + }, executor); + + } } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/strategy/ykc/RemoteStartChargingStrategy.java b/jsowell-netty/src/main/java/com/jsowell/netty/strategy/ykc/RemoteStartChargingStrategy.java index 9efb50731..35cb2f759 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/strategy/ykc/RemoteStartChargingStrategy.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/strategy/ykc/RemoteStartChargingStrategy.java @@ -1,6 +1,7 @@ package com.jsowell.netty.strategy.ykc; import com.jsowell.common.constant.Constants; +import com.jsowell.common.constant.mq.ThirdPartyRabbitConstants; import com.jsowell.common.core.domain.ykc.YKCDataProtocol; import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; import com.jsowell.common.enums.ykc.ChargingFailedReasonEnum; @@ -14,6 +15,7 @@ import com.jsowell.pile.service.OrderBasicInfoService; import com.jsowell.thirdparty.common.CommonService; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -38,6 +40,9 @@ public class RemoteStartChargingStrategy implements AbstractYkcStrategy { @Autowired private CommonService commonService; + @Autowired + private RabbitTemplate rabbitTemplate; + // 引入线程池 private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); @@ -107,28 +112,46 @@ public class RemoteStartChargingStrategy implements AbstractYkcStrategy { log.info("远程启动充电命令回复-交易流水号:{}, 桩编码:{}, 枪号:{}, 启动结果(00-失败, 01-成功):{}, 失败原因:{}", transactionCode, pileSn, connectorCode, startResult, failedReasonMsg); // 异步推送第三方平台 +// CompletableFuture.runAsync(() -> { +// OrderBasicInfo orderInfo = orderBasicInfoService.getOrderInfoByTransactionCode(transactionCode); +// if (orderInfo == null) { +// return; +// } +// try { +// // 启动结果回复 +// commonService.commonPushStartChargeResult(orderInfo); +// +// } catch (Exception e) { +// e.printStackTrace(); +// } +// // 启动失败, 推送第三方订单信息 +// if (StringUtils.equals(startResult, Constants.DOUBLE_ZERO)) { +// try { +// Thread.sleep(500); +// commonService.commonPushOrderInfo(orderInfo); +// } catch (Exception e) { +// log.error("统一推送第三方平台订单信息error, ", e); +// } +// } +// }, executor); + CompletableFuture.runAsync(() -> { OrderBasicInfo orderInfo = orderBasicInfoService.getOrderInfoByTransactionCode(transactionCode); if (orderInfo == null) { return; } - try { - // 启动结果回复 - commonService.commonPushStartChargeResult(orderInfo); - - } catch (Exception e) { - e.printStackTrace(); - } - // 启动失败, 推送第三方订单信息 - if (StringUtils.equals(startResult, Constants.DOUBLE_ZERO)) { + if (StringUtils.equals(startResult, Constants.DOUBLE_ZERO)){ + // 启动失败, 推送第三方订单信息 try { Thread.sleep(500); - commonService.commonPushOrderInfo(orderInfo); + rabbitTemplate.convertAndSend(ThirdPartyRabbitConstants.WCC_THIRDPARTY_NAME, ThirdPartyRabbitConstants.QUEUE_STARTUP_CHARGING_FAILED_PUSH, orderInfo); } catch (Exception e) { log.error("统一推送第三方平台订单信息error, ", e); } } }, executor); + + return null; } } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/strategy/ykc/UploadRealTimeMonitorStrategy.java b/jsowell-netty/src/main/java/com/jsowell/netty/strategy/ykc/UploadRealTimeMonitorStrategy.java index 154bb6bcd..dede07bbd 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/strategy/ykc/UploadRealTimeMonitorStrategy.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/strategy/ykc/UploadRealTimeMonitorStrategy.java @@ -2,6 +2,7 @@ package com.jsowell.netty.strategy.ykc; import com.jsowell.common.constant.CacheConstants; import com.jsowell.common.constant.Constants; +import com.jsowell.common.constant.mq.ThirdPartyRabbitConstants; import com.jsowell.common.core.domain.ykc.RealTimeMonitorData; import com.jsowell.common.core.domain.ykc.YKCDataProtocol; import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; @@ -21,6 +22,7 @@ import com.jsowell.pile.service.PileBasicInfoService; import com.jsowell.thirdparty.common.CommonService; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -64,6 +66,9 @@ public class UploadRealTimeMonitorStrategy implements AbstractYkcStrategy { @Autowired private RedisCache redisCache; + @Autowired + private RabbitTemplate rabbitTemplate; + @Override public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) { @@ -341,37 +346,54 @@ public class UploadRealTimeMonitorStrategy implements AbstractYkcStrategy { } - // 异步推送第三方平台实时数据 +// // 异步推送第三方平台实时数据 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.pushRealTimeInfo(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); +// } catch (Exception e) { +// log.error("统一推送第三方平台实时数据 error,", e); +// } +// }, executor); +// +// // 异步推送第三方平台实时数据V2 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.pushRealTimeInfoV2(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); +// } catch (Exception e) { +// log.error("统一推送第三方平台实时数据V2 error, ", e); +// } +// }, executor); +// +// if (StringUtils.equals(connectorStatus, Constants.ONE)) { +// // 故障 +// // 异步推送第三方平台告警信息 +// CompletableFuture.runAsync(() -> { +// try { +// commonService.commonPushAlarmInfo(pileConnectorCode, connectorStatus, realTimeMonitorData.getPutGunType()); +// } catch (Exception e) { +// log.error("统一推送第三方平台告警信息 error, ", e); +// } +// }, executor); +// } + + // TODO 测试mq + // 向mq中发送实时数据消息,给第三方服务消费 CompletableFuture.runAsync(() -> { try { - commonService.pushRealTimeInfo(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); - } catch (Exception e) { - log.error("统一推送第三方平台实时数据 error,", e); + rabbitTemplate.convertAndSend(ThirdPartyRabbitConstants.WCC_THIRDPARTY_NAME,ThirdPartyRabbitConstants.QUEUE_REALTIME_DATA_PUSH, realTimeMonitorData); + } catch (Exception e){ + log.error("向mq中发送实时数据消息供第三方服务消费 error, ", e); } - }, executor); - - // 异步推送第三方平台实时数据V2 - CompletableFuture.runAsync(() -> { - try { - commonService.pushRealTimeInfoV2(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); - } catch (Exception e) { - log.error("统一推送第三方平台实时数据V2 error, ", e); - } - }, executor); - - if (StringUtils.equals(connectorStatus, Constants.ONE)) { - // 故障 - // 异步推送第三方平台告警信息 - CompletableFuture.runAsync(() -> { + if (StringUtils.equals(connectorStatus,Constants.ONE)){ + // 故障 + // 异步推送第三方平台告警信息 try { - commonService.commonPushAlarmInfo(pileConnectorCode, connectorStatus, realTimeMonitorData.getPutGunType()); + rabbitTemplate.convertAndSend(ThirdPartyRabbitConstants.WCC_THIRDPARTY_NAME,ThirdPartyRabbitConstants.QUEUE_ALARM_PUSH, realTimeMonitorData.getPutGunType()); } catch (Exception e) { log.error("统一推送第三方平台告警信息 error, ", e); } - }, executor); - } - - // TODO 测试 + } + }, executor); return null; }