Files
jsowell-charger-web/doc/JCPP_JSON消息分区消费方案.md
2025-12-30 15:59:34 +08:00

8.1 KiB
Raw Blame History

JCPP JSON 消息分区消费方案

概述

本方案实现了基于 JSON 格式的 JCPP 消息分区消费,确保同一充电桩(相同 pileCode的消息按顺序处理避免并发导致的消息乱序问题。

核心特性

  1. 消息顺序保证:同一充电桩的消息按顺序处理
  2. 分区消费10 个分区,基于 pileCode 的 Hash 分区
  3. 单线程消费:每个分区单线程消费,保证顺序
  4. JSON 格式:保持原有 JSON 格式,无需 Protobuf
  5. 统一处理:整合所有消息类型的处理逻辑

架构设计

1. 分区策略

  • 分区数量:默认 10 个分区(可配置)
  • 分区算法MurmurHash3_128与 JCPP 保持一致)
  • 分区键pileCode充电桩编码
  • 消费模式:每个分区单线程消费,保证顺序

2. 核心组件

jsowell-pile/src/main/java/com/jsowell/pile/jcpp/
├── config/
│   └── JcppPartitionQueueConfig.java  # 分区队列配置
├── consumer/
│   └── JcppJsonPartitionConsumer.java # JSON 消息消费者
├── service/
│   ├── IJcppJsonMessageHandler.java   # 消息处理器接口
│   └── impl/
│       └── JcppJsonMessageHandlerImpl.java  # 消息处理器实现
└── util/
    └── JcppPartitionCalculator.java   # 分区计算器

使用说明

1. 配置文件

application-{env}.yml 中添加:

# JCPP 配置
jcpp:
  rabbitmq:
    # 分区数量(与 JCPP 保持一致)
    partition-count: 10
    # Exchange 名称
    exchange: jcpp.uplink.exchange
    # 队列前缀
    queue-prefix: jcpp.uplink.partition

2. RabbitMQ 队列

启动应用时会自动创建以下队列:

jcpp.uplink.partition.0
jcpp.uplink.partition.1
jcpp.uplink.partition.2
...
jcpp.uplink.partition.9

每个队列绑定到 jcpp.uplink.exchangerouting key 为 jcpp.uplink.#

3. 消息格式

消息体为 JSON 格式:

{
  "pileCode": "20231212000010",
  "messageType": "LOGIN",
  "data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}"
}

字段说明

  • pileCode:充电桩编码(用于分区计算)
  • messageType消息类型LOGIN, HEARTBEAT, GUN_STATUS, REAL_TIME_DATA, TRANSACTION_RECORD 等)
  • data具体消息内容JSON 字符串)

4. 支持的消息类型

消息类型 messageType 值 说明
登录请求 LOGIN 充电桩登录
心跳请求 HEARTBEAT 心跳保活
枪状态上报 GUN_STATUS 充电枪状态变化
充电进度 REAL_TIME_DATA 实时充电数据
交易记录 TRANSACTION_RECORD 充电完成记录
启动充电 START_CHARGE 刷卡启动充电
查询计费 QUERY_PRICING 查询计费模板
校验计费 VERIFY_PRICING 校验计费模板
会话关闭 SESSION_CLOSE 会话关闭事件
远程启动结果 REMOTE_START_RESULT 远程启动结果
远程停止结果 REMOTE_STOP_RESULT 远程停止结果

消息流程

1. JCPP 发送消息

JCPP 在发送消息时需要:

  1. 将消息序列化为 JSON 格式
  2. 根据 pileCode 计算分区编号
  3. 将消息发送到对应的分区队列
// JCPP 端示例代码
String pileCode = "20231212000010";
int partition = JcppPartitionCalculator.getPartition(pileCode);
String queueName = "jcpp.uplink.partition." + partition;

// 构建消息
JcppUplinkMessage message = new JcppUplinkMessage();
message.setPileCode(pileCode);
message.setMessageType("LOGIN");
message.setData("{...}");

// 发送到指定队列
rabbitTemplate.convertAndSend("jcpp.uplink.exchange", queueName, message);

2. Web 项目消费消息

@Component
public class JcppJsonPartitionConsumer {

    @RabbitListener(
        queues = {
            "jcpp.uplink.partition.0",
            "jcpp.uplink.partition.1",
            // ... 其他分区
        },
        concurrency = "1"  // 单线程消费保证顺序
    )
    public void consumeMessage(JcppUplinkMessage uplinkMessage) {
        // 处理消息
        messageHandler.handleUplinkMessage(uplinkMessage);
    }
}

3. 消息处理

@Service
public class JcppJsonMessageHandlerImpl implements IJcppJsonMessageHandler {

    @Override
    public void handleUplinkMessage(JcppUplinkMessage message) {
        // 根据消息类型分发处理
        switch (message.getMessageType()) {
            case "LOGIN":
                handleLogin(message);
                break;
            case "HEARTBEAT":
                handleHeartbeat(message);
                break;
            // ... 其他消息类型
        }
    }
}

分区算法

Hash 计算

使用 MurmurHash3_128 算法计算分区:

public static int getPartition(String pileCode) {
    // 使用 MurmurHash3_128 算法
    long hash = Hashing.murmur3_128()
            .hashString(pileCode, StandardCharsets.UTF_8)
            .asLong();

    // 取绝对值并对分区数取模
    return Math.abs((int) (hash % partitionCount));
}

分区示例

假设 partitionCount = 10

pileCode Hash 值 分区编号
20231212000010 123456789 9
20231212000011 987654321 1
20231212000012 456789123 3

同一个 pileCode 的所有消息都会路由到同一个分区,由单线程顺序处理。

与原有方案的对比

原有方案(无分区)

  • 多个消费者并发消费,可能导致消息乱序
  • 同一充电桩的消息可能被不同消费者处理
  • 实现简单,无需额外配置

新方案(分区消费)

  • 同一充电桩的消息保证顺序处理
  • 每个分区单线程消费,避免并发问题
  • 支持水平扩展(增加分区数量)
  • ⚠️ 需要 JCPP 端配合实现分区路由

监控与运维

1. 日志监控

[分区0] 收到消息: pileCode=20231212000010, messageType=LOGIN
[分区0] 消息处理完成: pileCode=20231212000010

2. 性能指标

  • 消息处理延迟
  • 队列堆积数量
  • 消费速率
  • 错误率

3. 告警规则

  • 队列堆积超过阈值
  • 消息处理失败率超过阈值
  • 消费延迟超过阈值

注意事项

1. 分区数量

  • 分区数量必须与 JCPP 保持一致
  • 修改分区数量需要重新创建队列
  • 建议在系统初始化时确定分区数量,后续不要修改

2. 消息顺序

  • 每个分区单线程消费(concurrency=1
  • 同一 pileCode 的消息保证顺序
  • 不同 pileCode 的消息可能并发处理

3. 异常处理

  • JSON 解析失败:记录日志并抛出异常
  • 业务处理失败:根据业务需求决定是否重试
  • 建议配置死信队列处理失败消息

4. 性能优化

  • 合理设置分区数量(建议 10-20
  • 避免在消息处理中执行耗时操作
  • 使用 Redis 缓存减少数据库压力

故障排查

1. 消息无法消费

  • 检查队列是否创建成功
  • 检查队列绑定是否正确
  • 检查 RabbitMQ 连接是否正常

2. 消息分区错误

  • 检查 Hash 算法是否与 JCPP 一致
  • 检查分区数量配置是否一致
  • 查看日志中的分区不匹配警告

3. 消息处理失败

  • 查看异常堆栈信息
  • 检查业务逻辑是否正确
  • 检查数据库连接是否正常

部署步骤

1. 更新配置

application-{env}.yml 中添加 JCPP 配置

2. 启动应用

mvn clean package -DskipTests
java -jar jsowell-admin/target/jsowell-admin.jar --spring.profiles.active=sit

3. 验证队列创建

登录 RabbitMQ 管理界面,检查是否创建了 10 个分区队列

4. 发送测试消息

使用 JCPP 发送测试消息,验证分区路由是否正确

5. 监控运行状态

查看日志,确认消息正常消费

参考资料


文档版本v2.1JSON 格式) 最后更新2025-12-30 维护人员jsowell 团队