mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-04-20 02:55:04 +08:00
8.1 KiB
8.1 KiB
JCPP JSON 消息分区消费方案
概述
本方案实现了基于 JSON 格式的 JCPP 消息分区消费,确保同一充电桩(相同 pileCode)的消息按顺序处理,避免并发导致的消息乱序问题。
核心特性
- 消息顺序保证:同一充电桩的消息按顺序处理
- 分区消费:10 个分区,基于 pileCode 的 Hash 分区
- 单线程消费:每个分区单线程消费,保证顺序
- JSON 格式:保持原有 JSON 格式,无需 Protobuf
- 统一处理:整合所有消息类型的处理逻辑
架构设计
1. 分区策略
- 分区数量:默认 10 个分区(可配置)
- 分区算法:MurmurHash3_128(与 JCPP 保持一致)
- 分区键:pileCode(充电桩编码)
- 消费模式:每个分区单线程消费,保证顺序
2. 核心组件
jsowell-pile/src/main/java/com/jsowell/pile/jcpp/
├── config/
│ └── JcppPartitionQueueConfig.java # 分区队列配置
├── consumer/
│ └── JcppJsonPartitionConsumer.java # JSON 消息消费者
├── service/
│ ├── IJcppJsonMessageHandler.java # 消息处理器接口
│ └── impl/
│ └── JcppJsonMessageHandlerImpl.java # 消息处理器实现
└── util/
└── JcppPartitionCalculator.java # 分区计算器
使用说明
1. 配置文件
在 application-{env}.yml 中添加:
# JCPP 配置
jcpp:
rabbitmq:
# 分区数量(与 JCPP 保持一致)
partition-count: 10
# Exchange 名称
exchange: jcpp.uplink.exchange
# 队列前缀
queue-prefix: jcpp.uplink.partition
2. RabbitMQ 队列
启动应用时会自动创建以下队列:
jcpp.uplink.partition.0
jcpp.uplink.partition.1
jcpp.uplink.partition.2
...
jcpp.uplink.partition.9
每个队列绑定到 jcpp.uplink.exchange,routing key 为 jcpp.uplink.#
3. 消息格式
消息体为 JSON 格式:
{
"pileCode": "20231212000010",
"messageType": "LOGIN",
"data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}"
}
字段说明:
pileCode:充电桩编码(用于分区计算)messageType:消息类型(LOGIN, HEARTBEAT, GUN_STATUS, REAL_TIME_DATA, TRANSACTION_RECORD 等)data:具体消息内容(JSON 字符串)
4. 支持的消息类型
| 消息类型 | messageType 值 | 说明 |
|---|---|---|
| 登录请求 | LOGIN | 充电桩登录 |
| 心跳请求 | HEARTBEAT | 心跳保活 |
| 枪状态上报 | GUN_STATUS | 充电枪状态变化 |
| 充电进度 | REAL_TIME_DATA | 实时充电数据 |
| 交易记录 | TRANSACTION_RECORD | 充电完成记录 |
| 启动充电 | START_CHARGE | 刷卡启动充电 |
| 查询计费 | QUERY_PRICING | 查询计费模板 |
| 校验计费 | VERIFY_PRICING | 校验计费模板 |
| 会话关闭 | SESSION_CLOSE | 会话关闭事件 |
| 远程启动结果 | REMOTE_START_RESULT | 远程启动结果 |
| 远程停止结果 | REMOTE_STOP_RESULT | 远程停止结果 |
消息流程
1. JCPP 发送消息
JCPP 在发送消息时需要:
- 将消息序列化为 JSON 格式
- 根据 pileCode 计算分区编号
- 将消息发送到对应的分区队列
// JCPP 端示例代码
String pileCode = "20231212000010";
int partition = JcppPartitionCalculator.getPartition(pileCode);
String queueName = "jcpp.uplink.partition." + partition;
// 构建消息
JcppUplinkMessage message = new JcppUplinkMessage();
message.setPileCode(pileCode);
message.setMessageType("LOGIN");
message.setData("{...}");
// 发送到指定队列
rabbitTemplate.convertAndSend("jcpp.uplink.exchange", queueName, message);
2. Web 项目消费消息
@Component
public class JcppJsonPartitionConsumer {
@RabbitListener(
queues = {
"jcpp.uplink.partition.0",
"jcpp.uplink.partition.1",
// ... 其他分区
},
concurrency = "1" // 单线程消费保证顺序
)
public void consumeMessage(JcppUplinkMessage uplinkMessage) {
// 处理消息
messageHandler.handleUplinkMessage(uplinkMessage);
}
}
3. 消息处理
@Service
public class JcppJsonMessageHandlerImpl implements IJcppJsonMessageHandler {
@Override
public void handleUplinkMessage(JcppUplinkMessage message) {
// 根据消息类型分发处理
switch (message.getMessageType()) {
case "LOGIN":
handleLogin(message);
break;
case "HEARTBEAT":
handleHeartbeat(message);
break;
// ... 其他消息类型
}
}
}
分区算法
Hash 计算
使用 MurmurHash3_128 算法计算分区:
public static int getPartition(String pileCode) {
// 使用 MurmurHash3_128 算法
long hash = Hashing.murmur3_128()
.hashString(pileCode, StandardCharsets.UTF_8)
.asLong();
// 取绝对值并对分区数取模
return Math.abs((int) (hash % partitionCount));
}
分区示例
假设 partitionCount = 10:
| pileCode | Hash 值 | 分区编号 |
|---|---|---|
| 20231212000010 | 123456789 | 9 |
| 20231212000011 | 987654321 | 1 |
| 20231212000012 | 456789123 | 3 |
同一个 pileCode 的所有消息都会路由到同一个分区,由单线程顺序处理。
与原有方案的对比
原有方案(无分区)
- ❌ 多个消费者并发消费,可能导致消息乱序
- ❌ 同一充电桩的消息可能被不同消费者处理
- ✅ 实现简单,无需额外配置
新方案(分区消费)
- ✅ 同一充电桩的消息保证顺序处理
- ✅ 每个分区单线程消费,避免并发问题
- ✅ 支持水平扩展(增加分区数量)
- ⚠️ 需要 JCPP 端配合实现分区路由
监控与运维
1. 日志监控
[分区0] 收到消息: pileCode=20231212000010, messageType=LOGIN
[分区0] 消息处理完成: pileCode=20231212000010
2. 性能指标
- 消息处理延迟
- 队列堆积数量
- 消费速率
- 错误率
3. 告警规则
- 队列堆积超过阈值
- 消息处理失败率超过阈值
- 消费延迟超过阈值
注意事项
1. 分区数量
- 分区数量必须与 JCPP 保持一致
- 修改分区数量需要重新创建队列
- 建议在系统初始化时确定分区数量,后续不要修改
2. 消息顺序
- 每个分区单线程消费(
concurrency=1) - 同一 pileCode 的消息保证顺序
- 不同 pileCode 的消息可能并发处理
3. 异常处理
- JSON 解析失败:记录日志并抛出异常
- 业务处理失败:根据业务需求决定是否重试
- 建议配置死信队列处理失败消息
4. 性能优化
- 合理设置分区数量(建议 10-20)
- 避免在消息处理中执行耗时操作
- 使用 Redis 缓存减少数据库压力
故障排查
1. 消息无法消费
- 检查队列是否创建成功
- 检查队列绑定是否正确
- 检查 RabbitMQ 连接是否正常
2. 消息分区错误
- 检查 Hash 算法是否与 JCPP 一致
- 检查分区数量配置是否一致
- 查看日志中的分区不匹配警告
3. 消息处理失败
- 查看异常堆栈信息
- 检查业务逻辑是否正确
- 检查数据库连接是否正常
部署步骤
1. 更新配置
在 application-{env}.yml 中添加 JCPP 配置
2. 启动应用
mvn clean package -DskipTests
java -jar jsowell-admin/target/jsowell-admin.jar --spring.profiles.active=sit
3. 验证队列创建
登录 RabbitMQ 管理界面,检查是否创建了 10 个分区队列
4. 发送测试消息
使用 JCPP 发送测试消息,验证分区路由是否正确
5. 监控运行状态
查看日志,确认消息正常消费
参考资料
文档版本:v2.1(JSON 格式) 最后更新:2025-12-30 维护人员:jsowell 团队