# JCPP JSON 消息分区消费方案 ## 概述 本方案实现了基于 JSON 格式的 JCPP 消息分区消费,确保同一充电桩(相同 pileCode)的消息按顺序处理,避免并发导致的消息乱序问题。 ## 核心特性 1. **消息顺序保证**:同一充电桩的消息按顺序处理 2. **分区消费**:10 个分区,基于 pileCode 的 Hash 分区 3. **单线程消费**:每个分区单线程消费,保证顺序 4. **JSON 格式**:保持原有 JSON 格式,无需 Protobuf 5. **统一处理**:整合所有消息类型的处理逻辑 ## 架构设计 ### 1. 分区策略 - **分区数量**:默认 10 个分区(可配置) - **分区算法**:MurmurHash3_128(与 JCPP 保持一致) - **分区键**:pileCode(充电桩编码) - **消费模式**:每个分区单线程消费,保证顺序 ### 2. 核心组件 ``` jsowell-pile/src/main/java/com/jsowell/pile/jcpp/ ├── config/ │ └── JcppPartitionQueueConfig.java # 分区队列配置 ├── consumer/ │ └── JcppJsonPartitionConsumer.java # JSON 消息消费者 ├── service/ │ ├── IJcppJsonMessageHandler.java # 消息处理器接口 │ └── impl/ │ └── JcppJsonMessageHandlerImpl.java # 消息处理器实现 └── util/ └── JcppPartitionCalculator.java # 分区计算器 ``` ## 使用说明 ### 1. 配置文件 在 `application-{env}.yml` 中添加: ```yaml # JCPP 配置 jcpp: rabbitmq: # 分区数量(与 JCPP 保持一致) partition-count: 10 # Exchange 名称 exchange: jcpp.uplink.exchange # 队列前缀 queue-prefix: jcpp.uplink.partition ``` ### 2. RabbitMQ 队列 启动应用时会自动创建以下队列: ``` jcpp.uplink.partition.0 jcpp.uplink.partition.1 jcpp.uplink.partition.2 ... jcpp.uplink.partition.9 ``` 每个队列绑定到 `jcpp.uplink.exchange`,routing key 为 `jcpp.uplink.#` ### 3. 消息格式 消息体为 JSON 格式: ```json { "pileCode": "20231212000010", "messageType": "LOGIN", "data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}" } ``` **字段说明**: - `pileCode`:充电桩编码(用于分区计算) - `messageType`:消息类型(LOGIN, HEARTBEAT, GUN_STATUS, REAL_TIME_DATA, TRANSACTION_RECORD 等) - `data`:具体消息内容(JSON 字符串) ### 4. 支持的消息类型 | 消息类型 | messageType 值 | 说明 | |---------|---------------|------| | 登录请求 | LOGIN | 充电桩登录 | | 心跳请求 | HEARTBEAT | 心跳保活 | | 枪状态上报 | GUN_STATUS | 充电枪状态变化 | | 充电进度 | REAL_TIME_DATA | 实时充电数据 | | 交易记录 | TRANSACTION_RECORD | 充电完成记录 | | 启动充电 | START_CHARGE | 刷卡启动充电 | | 查询计费 | QUERY_PRICING | 查询计费模板 | | 校验计费 | VERIFY_PRICING | 校验计费模板 | | 会话关闭 | SESSION_CLOSE | 会话关闭事件 | | 远程启动结果 | REMOTE_START_RESULT | 远程启动结果 | | 远程停止结果 | REMOTE_STOP_RESULT | 远程停止结果 | ## 消息流程 ### 1. JCPP 发送消息 JCPP 在发送消息时需要: 1. 将消息序列化为 JSON 格式 2. 根据 pileCode 计算分区编号 3. 将消息发送到对应的分区队列 ```java // JCPP 端示例代码 String pileCode = "20231212000010"; int partition = JcppPartitionCalculator.getPartition(pileCode); String queueName = "jcpp.uplink.partition." + partition; // 构建消息 JcppUplinkMessage message = new JcppUplinkMessage(); message.setPileCode(pileCode); message.setMessageType("LOGIN"); message.setData("{...}"); // 发送到指定队列 rabbitTemplate.convertAndSend("jcpp.uplink.exchange", queueName, message); ``` ### 2. Web 项目消费消息 ```java @Component public class JcppJsonPartitionConsumer { @RabbitListener( queues = { "jcpp.uplink.partition.0", "jcpp.uplink.partition.1", // ... 其他分区 }, concurrency = "1" // 单线程消费保证顺序 ) public void consumeMessage(JcppUplinkMessage uplinkMessage) { // 处理消息 messageHandler.handleUplinkMessage(uplinkMessage); } } ``` ### 3. 消息处理 ```java @Service public class JcppJsonMessageHandlerImpl implements IJcppJsonMessageHandler { @Override public void handleUplinkMessage(JcppUplinkMessage message) { // 根据消息类型分发处理 switch (message.getMessageType()) { case "LOGIN": handleLogin(message); break; case "HEARTBEAT": handleHeartbeat(message); break; // ... 其他消息类型 } } } ``` ## 分区算法 ### Hash 计算 使用 MurmurHash3_128 算法计算分区: ```java public static int getPartition(String pileCode) { // 使用 MurmurHash3_128 算法 long hash = Hashing.murmur3_128() .hashString(pileCode, StandardCharsets.UTF_8) .asLong(); // 取绝对值并对分区数取模 return Math.abs((int) (hash % partitionCount)); } ``` ### 分区示例 假设 `partitionCount = 10`: | pileCode | Hash 值 | 分区编号 | |----------|---------|---------| | 20231212000010 | 123456789 | 9 | | 20231212000011 | 987654321 | 1 | | 20231212000012 | 456789123 | 3 | 同一个 pileCode 的所有消息都会路由到同一个分区,由单线程顺序处理。 ## 与原有方案的对比 ### 原有方案(无分区) - ❌ 多个消费者并发消费,可能导致消息乱序 - ❌ 同一充电桩的消息可能被不同消费者处理 - ✅ 实现简单,无需额外配置 ### 新方案(分区消费) - ✅ 同一充电桩的消息保证顺序处理 - ✅ 每个分区单线程消费,避免并发问题 - ✅ 支持水平扩展(增加分区数量) - ⚠️ 需要 JCPP 端配合实现分区路由 ## 监控与运维 ### 1. 日志监控 ``` [分区0] 收到消息: pileCode=20231212000010, messageType=LOGIN [分区0] 消息处理完成: pileCode=20231212000010 ``` ### 2. 性能指标 - 消息处理延迟 - 队列堆积数量 - 消费速率 - 错误率 ### 3. 告警规则 - 队列堆积超过阈值 - 消息处理失败率超过阈值 - 消费延迟超过阈值 ## 注意事项 ### 1. 分区数量 - 分区数量必须与 JCPP 保持一致 - 修改分区数量需要重新创建队列 - 建议在系统初始化时确定分区数量,后续不要修改 ### 2. 消息顺序 - 每个分区单线程消费(`concurrency=1`) - 同一 pileCode 的消息保证顺序 - 不同 pileCode 的消息可能并发处理 ### 3. 异常处理 - JSON 解析失败:记录日志并抛出异常 - 业务处理失败:根据业务需求决定是否重试 - 建议配置死信队列处理失败消息 ### 4. 性能优化 - 合理设置分区数量(建议 10-20) - 避免在消息处理中执行耗时操作 - 使用 Redis 缓存减少数据库压力 ## 故障排查 ### 1. 消息无法消费 - 检查队列是否创建成功 - 检查队列绑定是否正确 - 检查 RabbitMQ 连接是否正常 ### 2. 消息分区错误 - 检查 Hash 算法是否与 JCPP 一致 - 检查分区数量配置是否一致 - 查看日志中的分区不匹配警告 ### 3. 消息处理失败 - 查看异常堆栈信息 - 检查业务逻辑是否正确 - 检查数据库连接是否正常 ## 部署步骤 ### 1. 更新配置 在 `application-{env}.yml` 中添加 JCPP 配置 ### 2. 启动应用 ```bash mvn clean package -DskipTests java -jar jsowell-admin/target/jsowell-admin.jar --spring.profiles.active=sit ``` ### 3. 验证队列创建 登录 RabbitMQ 管理界面,检查是否创建了 10 个分区队列 ### 4. 发送测试消息 使用 JCPP 发送测试消息,验证分区路由是否正确 ### 5. 监控运行状态 查看日志,确认消息正常消费 ## 参考资料 - [Spring AMQP 文档](https://docs.spring.io/spring-amqp/reference/) - [RabbitMQ 分区队列](https://www.rabbitmq.com/partitions.html) - [Guava Hashing](https://github.com/google/guava/wiki/HashingExplained) --- **文档版本**:v2.1(JSON 格式) **最后更新**:2025-12-30 **维护人员**:jsowell 团队