From c3f58adae715bfe85f0b5afcc77d003094946114 Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Wed, 24 Dec 2025 16:14:21 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E8=BF=87=20RabbitMQ=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=A7=AF=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/config/mq/PointsRabbitConfig.java | 43 +++++++++++ .../common/constant/RabbitConstants.java | 9 +++ .../jsowell/pile/dto/PointsRewardMessage.java | 46 ++++++++++++ .../mq/consumer/PointsRewardConsumer.java | 71 +++++++++++++++++++ .../mq/producer/PointsRewardProducer.java | 41 +++++++++++ .../impl/OrderBasicInfoServiceImpl.java | 58 +++++++++++++++ 6 files changed, 268 insertions(+) create mode 100644 jsowell-common/src/main/java/com/jsowell/common/config/mq/PointsRabbitConfig.java create mode 100644 jsowell-pile/src/main/java/com/jsowell/pile/dto/PointsRewardMessage.java create mode 100644 jsowell-pile/src/main/java/com/jsowell/pile/mq/consumer/PointsRewardConsumer.java create mode 100644 jsowell-pile/src/main/java/com/jsowell/pile/mq/producer/PointsRewardProducer.java diff --git a/jsowell-common/src/main/java/com/jsowell/common/config/mq/PointsRabbitConfig.java b/jsowell-common/src/main/java/com/jsowell/common/config/mq/PointsRabbitConfig.java new file mode 100644 index 000000000..88fbd8f0e --- /dev/null +++ b/jsowell-common/src/main/java/com/jsowell/common/config/mq/PointsRabbitConfig.java @@ -0,0 +1,43 @@ +package com.jsowell.common.config.mq; + +import com.jsowell.common.constant.RabbitConstants; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 积分奖励 RabbitMQ 配置 + */ +@Configuration +public class PointsRabbitConfig { + + /** + * 积分奖励队列 + */ + @Bean + public Queue pointsRewardQueue() { + // durable: 持久化 + return new Queue(RabbitConstants.QUEUE_POINTS_REWARD, true); + } + + /** + * 积分奖励交换机 + */ + @Bean + public DirectExchange pointsRewardExchange() { + return new DirectExchange(RabbitConstants.EXCHANGE_POINTS_REWARD, true, false); + } + + /** + * 绑定队列到交换机 + */ + @Bean + public Binding pointsRewardBinding(Queue pointsRewardQueue, DirectExchange pointsRewardExchange) { + return BindingBuilder.bind(pointsRewardQueue) + .to(pointsRewardExchange) + .with(RabbitConstants.ROUTING_KEY_POINTS_REWARD); + } +} diff --git a/jsowell-common/src/main/java/com/jsowell/common/constant/RabbitConstants.java b/jsowell-common/src/main/java/com/jsowell/common/constant/RabbitConstants.java index 21d75a4e9..9ed41fecb 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/constant/RabbitConstants.java +++ b/jsowell-common/src/main/java/com/jsowell/common/constant/RabbitConstants.java @@ -40,4 +40,13 @@ public class RabbitConstants { // upChargeOrderInfo: 消费充电结算订单 public static final String QUEUE_UP_CHARGE_ORDER_INFO = "ykc.upChargeOrderInfo-topic.userplat-group"; + // pointsReward: 积分奖励队列 + public static final String QUEUE_POINTS_REWARD = "ykc.pointsReward-topic.member-group"; + + // 积分奖励交换机 + public static final String EXCHANGE_POINTS_REWARD = "pointsRewardExchange"; + + // 积分奖励路由键 + public static final String ROUTING_KEY_POINTS_REWARD = "points.reward"; + } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/dto/PointsRewardMessage.java b/jsowell-pile/src/main/java/com/jsowell/pile/dto/PointsRewardMessage.java new file mode 100644 index 000000000..296392b12 --- /dev/null +++ b/jsowell-pile/src/main/java/com/jsowell/pile/dto/PointsRewardMessage.java @@ -0,0 +1,46 @@ +package com.jsowell.pile.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.math.BigDecimal; + +/** + * 积分奖励消息 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class PointsRewardMessage implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 会员ID + */ + private String memberId; + + /** + * 积分数量(等于实际结算金额) + */ + private BigDecimal points; + + /** + * 关联订单号 + */ + private String orderCode; + + /** + * 支付方式 + */ + private String payMode; + + /** + * 结算金额 + */ + private BigDecimal settleAmount; +} diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/mq/consumer/PointsRewardConsumer.java b/jsowell-pile/src/main/java/com/jsowell/pile/mq/consumer/PointsRewardConsumer.java new file mode 100644 index 000000000..5dbbc69e6 --- /dev/null +++ b/jsowell-pile/src/main/java/com/jsowell/pile/mq/consumer/PointsRewardConsumer.java @@ -0,0 +1,71 @@ +package com.jsowell.pile.mq.consumer; + +import com.alibaba.fastjson2.JSON; +import com.jsowell.common.constant.RabbitConstants; +import com.jsowell.pile.dto.PointsRewardMessage; +import com.jsowell.pile.service.MemberPointsInfoService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.math.BigDecimal; + +/** + * 积分奖励消息消费者 + */ +@Slf4j +@Component +public class PointsRewardConsumer { + + @Autowired + private MemberPointsInfoService memberPointsInfoService; + + /** + * 消费积分奖励消息 + * + * @param message 积分奖励消息 + * @param channel RabbitMQ通道 + * @param amqpMessage AMQP消息 + */ + @RabbitListener(queues = RabbitConstants.QUEUE_POINTS_REWARD) + public void receivePointsRewardMessage(PointsRewardMessage message, Channel channel, Message amqpMessage) { + log.info("接收到积分奖励消息: {}", JSON.toJSONString(message)); + + try { + // 校验消息参数 + if (message == null || message.getMemberId() == null || message.getPoints() == null) { + log.warn("积分奖励消息参数不完整: {}", JSON.toJSONString(message)); + return; + } + + // 校验积分数量 + if (message.getPoints().compareTo(BigDecimal.ZERO) <= 0) { + log.warn("积分数量必须大于0, orderCode: {}, points: {}", message.getOrderCode(), message.getPoints()); + return; + } + + // 调用积分服务增加积分 + boolean result = memberPointsInfoService.addPoints( + message.getMemberId(), + message.getPoints(), + message.getOrderCode() + ); + + if (result) { + log.info("积分奖励发放成功, orderCode: {}, memberId: {}, points: {}", + message.getOrderCode(), message.getMemberId(), message.getPoints()); + } else { + log.error("积分奖励发放失败, orderCode: {}, memberId: {}, points: {}", + message.getOrderCode(), message.getMemberId(), message.getPoints()); + } + } catch (Exception e) { + log.error("积分奖励消息处理异常, orderCode: {}, memberId: {}, error: {}", + message.getOrderCode(), message.getMemberId(), e.getMessage(), e); + // 抛出异常让 RabbitMQ 进行重试 + throw new RuntimeException("积分奖励消息处理失败", e); + } + } +} diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/mq/producer/PointsRewardProducer.java b/jsowell-pile/src/main/java/com/jsowell/pile/mq/producer/PointsRewardProducer.java new file mode 100644 index 000000000..22cb095e8 --- /dev/null +++ b/jsowell-pile/src/main/java/com/jsowell/pile/mq/producer/PointsRewardProducer.java @@ -0,0 +1,41 @@ +package com.jsowell.pile.mq.producer; + +import com.alibaba.fastjson2.JSON; +import com.jsowell.common.constant.RabbitConstants; +import com.jsowell.pile.dto.PointsRewardMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 积分奖励消息生产者 + */ +@Slf4j +@Component +public class PointsRewardProducer { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * 发送积分奖励消息 + * + * @param message 积分奖励消息 + */ + public void sendPointsRewardMessage(PointsRewardMessage message) { + try { + log.info("发送积分奖励消息: {}", JSON.toJSONString(message)); + rabbitTemplate.convertAndSend( + RabbitConstants.EXCHANGE_POINTS_REWARD, + RabbitConstants.ROUTING_KEY_POINTS_REWARD, + message + ); + log.info("积分奖励消息发送成功, orderCode: {}, memberId: {}, points: {}", + message.getOrderCode(), message.getMemberId(), message.getPoints()); + } catch (Exception e) { + log.error("积分奖励消息发送失败, orderCode: {}, memberId: {}, points: {}, error: {}", + message.getOrderCode(), message.getMemberId(), message.getPoints(), e.getMessage(), e); + } + } +} diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/OrderBasicInfoServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/OrderBasicInfoServiceImpl.java index 0375336f3..2407a4a23 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/OrderBasicInfoServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/OrderBasicInfoServiceImpl.java @@ -198,6 +198,9 @@ public class OrderBasicInfoServiceImpl implements OrderBasicInfoService { @Autowired private AdapayRefundRecordService adapayRefundRecordService; + @Autowired + private com.jsowell.pile.mq.producer.PointsRewardProducer pointsRewardProducer; + @Override public int deleteByPrimaryKey(Integer id) { return orderBasicInfoMapper.deleteByPrimaryKey(id); @@ -2229,12 +2232,67 @@ public class OrderBasicInfoServiceImpl implements OrderBasicInfoService { logger.error("realTimeOrderSplit-订单:{}, 订单结算金额,汇付分账异常", afterSettleOrderDTO.getOrderCode(), e); } + // 发放积分奖励(异步) + // 条件:1.结算金额大于0 2.在线支付(微信支付、支付宝支付) + try { + sendPointsRewardMessage(afterSettleOrderDTO, orderBasicInfo); + } catch (Exception e) { + logger.error("realTimeOrderSplit-订单:{}, 发送积分奖励消息异常", afterSettleOrderDTO.getOrderCode(), e); + } + // 组装结果集 OrderSplitResult result = new OrderSplitResult(); result.setOrderCode(afterSettleOrderDTO.getOrderCode()); return result; } + /** + * 发送积分奖励消息 + * 条件:1.结算金额大于0 2.在线支付(微信支付、支付宝支付) + * + * @param afterSettleOrderDTO 订单结算信息 + * @param orderBasicInfo 订单基本信息 + */ + private void sendPointsRewardMessage(AfterSettleOrderDTO afterSettleOrderDTO, OrderBasicInfo orderBasicInfo) { + String orderCode = afterSettleOrderDTO.getOrderCode(); + String payMode = orderBasicInfo.getPayMode(); + BigDecimal settleAmount = afterSettleOrderDTO.getOrderSettleAmount(); + + // 校验:结算金额必须大于0 + if (settleAmount == null || settleAmount.compareTo(BigDecimal.ZERO) <= 0) { + logger.info("积分奖励-订单:{}, 结算金额({})不大于0,不发放积分", orderCode, settleAmount); + return; + } + + // 校验:只有在线支付(微信支付、支付宝支付)才发放积分 + boolean isOnlinePayment = StringUtils.equals(payMode, OrderPayModeEnum.PAYMENT_OF_WECHATPAY.getValue()) + || StringUtils.equals(payMode, OrderPayModeEnum.PAYMENT_OF_ALIPAY.getValue()); + if (!isOnlinePayment) { + logger.info("积分奖励-订单:{}, 支付方式({})不是在线支付,不发放积分", orderCode, payMode); + return; + } + + // 获取会员ID + String memberId = orderBasicInfo.getMemberId(); + if (StringUtils.isBlank(memberId)) { + logger.warn("积分奖励-订单:{}, 会员ID为空,不发放积分", orderCode); + return; + } + + // 构建积分奖励消息,积分数量等于实际结算金额 + com.jsowell.pile.dto.PointsRewardMessage message = com.jsowell.pile.dto.PointsRewardMessage.builder() + .memberId(memberId) + .points(settleAmount) + .orderCode(orderCode) + .payMode(payMode) + .settleAmount(settleAmount) + .build(); + + // 发送积分奖励消息到 RabbitMQ + pointsRewardProducer.sendPointsRewardMessage(message); + logger.info("积分奖励-订单:{}, 会员:{}, 积分:{}, 消息已发送", orderCode, memberId, settleAmount); + } + /** * 获取订单待分账信息 */