Files
jsowell-charger-web/doc/JCPP_JSON消息分区消费方案.md
2025-12-30 15:59:34 +08:00

323 lines
8.1 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# JCPP JSON 消息分区消费方案
## 概述
本方案实现了基于 JSON 格式的 JCPP 消息分区消费,确保同一充电桩(相同 pileCode的消息按顺序处理避免并发导致的消息乱序问题。
## 核心特性
1. **消息顺序保证**:同一充电桩的消息按顺序处理
2. **分区消费**10 个分区,基于 pileCode 的 Hash 分区
3. **单线程消费**:每个分区单线程消费,保证顺序
4. **JSON 格式**:保持原有 JSON 格式,无需 Protobuf
5. **统一处理**:整合所有消息类型的处理逻辑
## 架构设计
### 1. 分区策略
- **分区数量**:默认 10 个分区(可配置)
- **分区算法**MurmurHash3_128与 JCPP 保持一致)
- **分区键**pileCode充电桩编码
- **消费模式**:每个分区单线程消费,保证顺序
### 2. 核心组件
```
jsowell-pile/src/main/java/com/jsowell/pile/jcpp/
├── config/
│ └── JcppPartitionQueueConfig.java # 分区队列配置
├── consumer/
│ └── JcppJsonPartitionConsumer.java # JSON 消息消费者
├── service/
│ ├── IJcppJsonMessageHandler.java # 消息处理器接口
│ └── impl/
│ └── JcppJsonMessageHandlerImpl.java # 消息处理器实现
└── util/
└── JcppPartitionCalculator.java # 分区计算器
```
## 使用说明
### 1. 配置文件
`application-{env}.yml` 中添加:
```yaml
# JCPP 配置
jcpp:
rabbitmq:
# 分区数量(与 JCPP 保持一致)
partition-count: 10
# Exchange 名称
exchange: jcpp.uplink.exchange
# 队列前缀
queue-prefix: jcpp.uplink.partition
```
### 2. RabbitMQ 队列
启动应用时会自动创建以下队列:
```
jcpp.uplink.partition.0
jcpp.uplink.partition.1
jcpp.uplink.partition.2
...
jcpp.uplink.partition.9
```
每个队列绑定到 `jcpp.uplink.exchange`routing key 为 `jcpp.uplink.#`
### 3. 消息格式
消息体为 JSON 格式:
```json
{
"pileCode": "20231212000010",
"messageType": "LOGIN",
"data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}"
}
```
**字段说明**
- `pileCode`:充电桩编码(用于分区计算)
- `messageType`消息类型LOGIN, HEARTBEAT, GUN_STATUS, REAL_TIME_DATA, TRANSACTION_RECORD 等)
- `data`具体消息内容JSON 字符串)
### 4. 支持的消息类型
| 消息类型 | messageType 值 | 说明 |
|---------|---------------|------|
| 登录请求 | LOGIN | 充电桩登录 |
| 心跳请求 | HEARTBEAT | 心跳保活 |
| 枪状态上报 | GUN_STATUS | 充电枪状态变化 |
| 充电进度 | REAL_TIME_DATA | 实时充电数据 |
| 交易记录 | TRANSACTION_RECORD | 充电完成记录 |
| 启动充电 | START_CHARGE | 刷卡启动充电 |
| 查询计费 | QUERY_PRICING | 查询计费模板 |
| 校验计费 | VERIFY_PRICING | 校验计费模板 |
| 会话关闭 | SESSION_CLOSE | 会话关闭事件 |
| 远程启动结果 | REMOTE_START_RESULT | 远程启动结果 |
| 远程停止结果 | REMOTE_STOP_RESULT | 远程停止结果 |
## 消息流程
### 1. JCPP 发送消息
JCPP 在发送消息时需要:
1. 将消息序列化为 JSON 格式
2. 根据 pileCode 计算分区编号
3. 将消息发送到对应的分区队列
```java
// JCPP 端示例代码
String pileCode = "20231212000010";
int partition = JcppPartitionCalculator.getPartition(pileCode);
String queueName = "jcpp.uplink.partition." + partition;
// 构建消息
JcppUplinkMessage message = new JcppUplinkMessage();
message.setPileCode(pileCode);
message.setMessageType("LOGIN");
message.setData("{...}");
// 发送到指定队列
rabbitTemplate.convertAndSend("jcpp.uplink.exchange", queueName, message);
```
### 2. Web 项目消费消息
```java
@Component
public class JcppJsonPartitionConsumer {
@RabbitListener(
queues = {
"jcpp.uplink.partition.0",
"jcpp.uplink.partition.1",
// ... 其他分区
},
concurrency = "1" // 单线程消费保证顺序
)
public void consumeMessage(JcppUplinkMessage uplinkMessage) {
// 处理消息
messageHandler.handleUplinkMessage(uplinkMessage);
}
}
```
### 3. 消息处理
```java
@Service
public class JcppJsonMessageHandlerImpl implements IJcppJsonMessageHandler {
@Override
public void handleUplinkMessage(JcppUplinkMessage message) {
// 根据消息类型分发处理
switch (message.getMessageType()) {
case "LOGIN":
handleLogin(message);
break;
case "HEARTBEAT":
handleHeartbeat(message);
break;
// ... 其他消息类型
}
}
}
```
## 分区算法
### Hash 计算
使用 MurmurHash3_128 算法计算分区:
```java
public static int getPartition(String pileCode) {
// 使用 MurmurHash3_128 算法
long hash = Hashing.murmur3_128()
.hashString(pileCode, StandardCharsets.UTF_8)
.asLong();
// 取绝对值并对分区数取模
return Math.abs((int) (hash % partitionCount));
}
```
### 分区示例
假设 `partitionCount = 10`
| pileCode | Hash 值 | 分区编号 |
|----------|---------|---------|
| 20231212000010 | 123456789 | 9 |
| 20231212000011 | 987654321 | 1 |
| 20231212000012 | 456789123 | 3 |
同一个 pileCode 的所有消息都会路由到同一个分区,由单线程顺序处理。
## 与原有方案的对比
### 原有方案(无分区)
- ❌ 多个消费者并发消费,可能导致消息乱序
- ❌ 同一充电桩的消息可能被不同消费者处理
- ✅ 实现简单,无需额外配置
### 新方案(分区消费)
- ✅ 同一充电桩的消息保证顺序处理
- ✅ 每个分区单线程消费,避免并发问题
- ✅ 支持水平扩展(增加分区数量)
- ⚠️ 需要 JCPP 端配合实现分区路由
## 监控与运维
### 1. 日志监控
```
[分区0] 收到消息: pileCode=20231212000010, messageType=LOGIN
[分区0] 消息处理完成: pileCode=20231212000010
```
### 2. 性能指标
- 消息处理延迟
- 队列堆积数量
- 消费速率
- 错误率
### 3. 告警规则
- 队列堆积超过阈值
- 消息处理失败率超过阈值
- 消费延迟超过阈值
## 注意事项
### 1. 分区数量
- 分区数量必须与 JCPP 保持一致
- 修改分区数量需要重新创建队列
- 建议在系统初始化时确定分区数量,后续不要修改
### 2. 消息顺序
- 每个分区单线程消费(`concurrency=1`
- 同一 pileCode 的消息保证顺序
- 不同 pileCode 的消息可能并发处理
### 3. 异常处理
- JSON 解析失败:记录日志并抛出异常
- 业务处理失败:根据业务需求决定是否重试
- 建议配置死信队列处理失败消息
### 4. 性能优化
- 合理设置分区数量(建议 10-20
- 避免在消息处理中执行耗时操作
- 使用 Redis 缓存减少数据库压力
## 故障排查
### 1. 消息无法消费
- 检查队列是否创建成功
- 检查队列绑定是否正确
- 检查 RabbitMQ 连接是否正常
### 2. 消息分区错误
- 检查 Hash 算法是否与 JCPP 一致
- 检查分区数量配置是否一致
- 查看日志中的分区不匹配警告
### 3. 消息处理失败
- 查看异常堆栈信息
- 检查业务逻辑是否正确
- 检查数据库连接是否正常
## 部署步骤
### 1. 更新配置
`application-{env}.yml` 中添加 JCPP 配置
### 2. 启动应用
```bash
mvn clean package -DskipTests
java -jar jsowell-admin/target/jsowell-admin.jar --spring.profiles.active=sit
```
### 3. 验证队列创建
登录 RabbitMQ 管理界面,检查是否创建了 10 个分区队列
### 4. 发送测试消息
使用 JCPP 发送测试消息,验证分区路由是否正确
### 5. 监控运行状态
查看日志,确认消息正常消费
## 参考资料
- [Spring AMQP 文档](https://docs.spring.io/spring-amqp/reference/)
- [RabbitMQ 分区队列](https://www.rabbitmq.com/partitions.html)
- [Guava Hashing](https://github.com/google/guava/wiki/HashingExplained)
---
**文档版本**v2.1JSON 格式)
**最后更新**2025-12-30
**维护人员**jsowell 团队