Files
jsowell-charger-web/doc/JCPP_JSON消息分区消费方案.md

323 lines
8.1 KiB
Markdown
Raw Normal View History

2025-12-30 15:59:34 +08:00
# JCPP JSON 消息分区消费方案
## 概述
本方案实现了基于 JSON 格式的 JCPP 消息分区消费,确保同一充电桩(相同 pileCode的消息按顺序处理避免并发导致的消息乱序问题。
## 核心特性
1. **消息顺序保证**:同一充电桩的消息按顺序处理
2. **分区消费**10 个分区,基于 pileCode 的 Hash 分区
3. **单线程消费**:每个分区单线程消费,保证顺序
4. **JSON 格式**:保持原有 JSON 格式,无需 Protobuf
5. **统一处理**:整合所有消息类型的处理逻辑
## 架构设计
### 1. 分区策略
- **分区数量**:默认 10 个分区(可配置)
- **分区算法**MurmurHash3_128与 JCPP 保持一致)
- **分区键**pileCode充电桩编码
- **消费模式**:每个分区单线程消费,保证顺序
### 2. 核心组件
```
jsowell-pile/src/main/java/com/jsowell/pile/jcpp/
├── config/
│ └── JcppPartitionQueueConfig.java # 分区队列配置
├── consumer/
│ └── JcppJsonPartitionConsumer.java # JSON 消息消费者
├── service/
│ ├── IJcppJsonMessageHandler.java # 消息处理器接口
│ └── impl/
│ └── JcppJsonMessageHandlerImpl.java # 消息处理器实现
└── util/
└── JcppPartitionCalculator.java # 分区计算器
```
## 使用说明
### 1. 配置文件
`application-{env}.yml` 中添加:
```yaml
# JCPP 配置
jcpp:
rabbitmq:
# 分区数量(与 JCPP 保持一致)
partition-count: 10
# Exchange 名称
exchange: jcpp.uplink.exchange
# 队列前缀
queue-prefix: jcpp.uplink.partition
```
### 2. RabbitMQ 队列
启动应用时会自动创建以下队列:
```
jcpp.uplink.partition.0
jcpp.uplink.partition.1
jcpp.uplink.partition.2
...
jcpp.uplink.partition.9
```
每个队列绑定到 `jcpp.uplink.exchange`routing key 为 `jcpp.uplink.#`
### 3. 消息格式
消息体为 JSON 格式:
```json
{
"pileCode": "20231212000010",
"messageType": "LOGIN",
"data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}"
}
```
**字段说明**
- `pileCode`:充电桩编码(用于分区计算)
- `messageType`消息类型LOGIN, HEARTBEAT, GUN_STATUS, REAL_TIME_DATA, TRANSACTION_RECORD 等)
- `data`具体消息内容JSON 字符串)
### 4. 支持的消息类型
| 消息类型 | messageType 值 | 说明 |
|---------|---------------|------|
| 登录请求 | LOGIN | 充电桩登录 |
| 心跳请求 | HEARTBEAT | 心跳保活 |
| 枪状态上报 | GUN_STATUS | 充电枪状态变化 |
| 充电进度 | REAL_TIME_DATA | 实时充电数据 |
| 交易记录 | TRANSACTION_RECORD | 充电完成记录 |
| 启动充电 | START_CHARGE | 刷卡启动充电 |
| 查询计费 | QUERY_PRICING | 查询计费模板 |
| 校验计费 | VERIFY_PRICING | 校验计费模板 |
| 会话关闭 | SESSION_CLOSE | 会话关闭事件 |
| 远程启动结果 | REMOTE_START_RESULT | 远程启动结果 |
| 远程停止结果 | REMOTE_STOP_RESULT | 远程停止结果 |
## 消息流程
### 1. JCPP 发送消息
JCPP 在发送消息时需要:
1. 将消息序列化为 JSON 格式
2. 根据 pileCode 计算分区编号
3. 将消息发送到对应的分区队列
```java
// JCPP 端示例代码
String pileCode = "20231212000010";
int partition = JcppPartitionCalculator.getPartition(pileCode);
String queueName = "jcpp.uplink.partition." + partition;
// 构建消息
JcppUplinkMessage message = new JcppUplinkMessage();
message.setPileCode(pileCode);
message.setMessageType("LOGIN");
message.setData("{...}");
// 发送到指定队列
rabbitTemplate.convertAndSend("jcpp.uplink.exchange", queueName, message);
```
### 2. Web 项目消费消息
```java
@Component
public class JcppJsonPartitionConsumer {
@RabbitListener(
queues = {
"jcpp.uplink.partition.0",
"jcpp.uplink.partition.1",
// ... 其他分区
},
concurrency = "1" // 单线程消费保证顺序
)
public void consumeMessage(JcppUplinkMessage uplinkMessage) {
// 处理消息
messageHandler.handleUplinkMessage(uplinkMessage);
}
}
```
### 3. 消息处理
```java
@Service
public class JcppJsonMessageHandlerImpl implements IJcppJsonMessageHandler {
@Override
public void handleUplinkMessage(JcppUplinkMessage message) {
// 根据消息类型分发处理
switch (message.getMessageType()) {
case "LOGIN":
handleLogin(message);
break;
case "HEARTBEAT":
handleHeartbeat(message);
break;
// ... 其他消息类型
}
}
}
```
## 分区算法
### Hash 计算
使用 MurmurHash3_128 算法计算分区:
```java
public static int getPartition(String pileCode) {
// 使用 MurmurHash3_128 算法
long hash = Hashing.murmur3_128()
.hashString(pileCode, StandardCharsets.UTF_8)
.asLong();
// 取绝对值并对分区数取模
return Math.abs((int) (hash % partitionCount));
}
```
### 分区示例
假设 `partitionCount = 10`
| pileCode | Hash 值 | 分区编号 |
|----------|---------|---------|
| 20231212000010 | 123456789 | 9 |
| 20231212000011 | 987654321 | 1 |
| 20231212000012 | 456789123 | 3 |
同一个 pileCode 的所有消息都会路由到同一个分区,由单线程顺序处理。
## 与原有方案的对比
### 原有方案(无分区)
- ❌ 多个消费者并发消费,可能导致消息乱序
- ❌ 同一充电桩的消息可能被不同消费者处理
- ✅ 实现简单,无需额外配置
### 新方案(分区消费)
- ✅ 同一充电桩的消息保证顺序处理
- ✅ 每个分区单线程消费,避免并发问题
- ✅ 支持水平扩展(增加分区数量)
- ⚠️ 需要 JCPP 端配合实现分区路由
## 监控与运维
### 1. 日志监控
```
[分区0] 收到消息: pileCode=20231212000010, messageType=LOGIN
[分区0] 消息处理完成: pileCode=20231212000010
```
### 2. 性能指标
- 消息处理延迟
- 队列堆积数量
- 消费速率
- 错误率
### 3. 告警规则
- 队列堆积超过阈值
- 消息处理失败率超过阈值
- 消费延迟超过阈值
## 注意事项
### 1. 分区数量
- 分区数量必须与 JCPP 保持一致
- 修改分区数量需要重新创建队列
- 建议在系统初始化时确定分区数量,后续不要修改
### 2. 消息顺序
- 每个分区单线程消费(`concurrency=1`
- 同一 pileCode 的消息保证顺序
- 不同 pileCode 的消息可能并发处理
### 3. 异常处理
- JSON 解析失败:记录日志并抛出异常
- 业务处理失败:根据业务需求决定是否重试
- 建议配置死信队列处理失败消息
### 4. 性能优化
- 合理设置分区数量(建议 10-20
- 避免在消息处理中执行耗时操作
- 使用 Redis 缓存减少数据库压力
## 故障排查
### 1. 消息无法消费
- 检查队列是否创建成功
- 检查队列绑定是否正确
- 检查 RabbitMQ 连接是否正常
### 2. 消息分区错误
- 检查 Hash 算法是否与 JCPP 一致
- 检查分区数量配置是否一致
- 查看日志中的分区不匹配警告
### 3. 消息处理失败
- 查看异常堆栈信息
- 检查业务逻辑是否正确
- 检查数据库连接是否正常
## 部署步骤
### 1. 更新配置
`application-{env}.yml` 中添加 JCPP 配置
### 2. 启动应用
```bash
mvn clean package -DskipTests
java -jar jsowell-admin/target/jsowell-admin.jar --spring.profiles.active=sit
```
### 3. 验证队列创建
登录 RabbitMQ 管理界面,检查是否创建了 10 个分区队列
### 4. 发送测试消息
使用 JCPP 发送测试消息,验证分区路由是否正确
### 5. 监控运行状态
查看日志,确认消息正常消费
## 参考资料
- [Spring AMQP 文档](https://docs.spring.io/spring-amqp/reference/)
- [RabbitMQ 分区队列](https://www.rabbitmq.com/partitions.html)
- [Guava Hashing](https://github.com/google/guava/wiki/HashingExplained)
---
**文档版本**v2.1JSON 格式)
**最后更新**2025-12-30
**维护人员**jsowell 团队