mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-06-12 19:29:52 +08:00
update
This commit is contained in:
322
docs/JCPP_JSON消息分区消费方案.md
Normal file
322
docs/JCPP_JSON消息分区消费方案.md
Normal file
@@ -0,0 +1,322 @@
|
||||
# JCPP JSON 消息分区消费方案
|
||||
|
||||
## 概述
|
||||
|
||||
本方案实现了基于 JSON 格式的 JCPP 消息分区消费,确保同一充电桩(相同 pileCode)的消息按顺序处理,避免并发导致的消息乱序问题。
|
||||
|
||||
## 核心特性
|
||||
|
||||
1. **消息顺序保证**:同一充电桩的消息按顺序处理
|
||||
2. **分区消费**:10 个分区,基于 pileCode 的 Hash 分区
|
||||
3. **单线程消费**:每个分区单线程消费,保证顺序
|
||||
4. **JSON 格式**:保持原有 JSON 格式,无需 Protobuf
|
||||
5. **统一处理**:整合所有消息类型的处理逻辑
|
||||
|
||||
## 架构设计
|
||||
|
||||
### 1. 分区策略
|
||||
|
||||
- **分区数量**:默认 10 个分区(可配置)
|
||||
- **分区算法**:MurmurHash3_128(与 JCPP 保持一致)
|
||||
- **分区键**:pileCode(充电桩编码)
|
||||
- **消费模式**:每个分区单线程消费,保证顺序
|
||||
|
||||
### 2. 核心组件
|
||||
|
||||
```
|
||||
jsowell-pile/src/main/java/com/jsowell/pile/jcpp/
|
||||
├── config/
|
||||
│ └── JcppPartitionQueueConfig.java # 分区队列配置
|
||||
├── consumer/
|
||||
│ └── JcppJsonPartitionConsumer.java # JSON 消息消费者
|
||||
├── service/
|
||||
│ ├── IJcppJsonMessageHandler.java # 消息处理器接口
|
||||
│ └── impl/
|
||||
│ └── JcppJsonMessageHandlerImpl.java # 消息处理器实现
|
||||
└── util/
|
||||
└── JcppPartitionCalculator.java # 分区计算器
|
||||
```
|
||||
|
||||
## 使用说明
|
||||
|
||||
### 1. 配置文件
|
||||
|
||||
在 `application-{env}.yml` 中添加:
|
||||
|
||||
```yaml
|
||||
# JCPP 配置
|
||||
jcpp:
|
||||
rabbitmq:
|
||||
# 分区数量(与 JCPP 保持一致)
|
||||
partition-count: 10
|
||||
# Exchange 名称
|
||||
exchange: jcpp.uplink.exchange
|
||||
# 队列前缀
|
||||
queue-prefix: jcpp.uplink.partition
|
||||
```
|
||||
|
||||
### 2. RabbitMQ 队列
|
||||
|
||||
启动应用时会自动创建以下队列:
|
||||
|
||||
```
|
||||
jcpp.uplink.partition.0
|
||||
jcpp.uplink.partition.1
|
||||
jcpp.uplink.partition.2
|
||||
...
|
||||
jcpp.uplink.partition.9
|
||||
```
|
||||
|
||||
每个队列绑定到 `jcpp.uplink.exchange`,routing key 为 `jcpp.uplink.#`
|
||||
|
||||
### 3. 消息格式
|
||||
|
||||
消息体为 JSON 格式:
|
||||
|
||||
```json
|
||||
{
|
||||
"pileCode": "20231212000010",
|
||||
"messageType": "LOGIN",
|
||||
"data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}"
|
||||
}
|
||||
```
|
||||
|
||||
**字段说明**:
|
||||
- `pileCode`:充电桩编码(用于分区计算)
|
||||
- `messageType`:消息类型(LOGIN, HEARTBEAT, GUN_STATUS, REAL_TIME_DATA, TRANSACTION_RECORD 等)
|
||||
- `data`:具体消息内容(JSON 字符串)
|
||||
|
||||
### 4. 支持的消息类型
|
||||
|
||||
| 消息类型 | messageType 值 | 说明 |
|
||||
|---------|---------------|------|
|
||||
| 登录请求 | LOGIN | 充电桩登录 |
|
||||
| 心跳请求 | HEARTBEAT | 心跳保活 |
|
||||
| 枪状态上报 | GUN_STATUS | 充电枪状态变化 |
|
||||
| 充电进度 | REAL_TIME_DATA | 实时充电数据 |
|
||||
| 交易记录 | TRANSACTION_RECORD | 充电完成记录 |
|
||||
| 启动充电 | START_CHARGE | 刷卡启动充电 |
|
||||
| 查询计费 | QUERY_PRICING | 查询计费模板 |
|
||||
| 校验计费 | VERIFY_PRICING | 校验计费模板 |
|
||||
| 会话关闭 | SESSION_CLOSE | 会话关闭事件 |
|
||||
| 远程启动结果 | REMOTE_START_RESULT | 远程启动结果 |
|
||||
| 远程停止结果 | REMOTE_STOP_RESULT | 远程停止结果 |
|
||||
|
||||
## 消息流程
|
||||
|
||||
### 1. JCPP 发送消息
|
||||
|
||||
JCPP 在发送消息时需要:
|
||||
|
||||
1. 将消息序列化为 JSON 格式
|
||||
2. 根据 pileCode 计算分区编号
|
||||
3. 将消息发送到对应的分区队列
|
||||
|
||||
```java
|
||||
// JCPP 端示例代码
|
||||
String pileCode = "20231212000010";
|
||||
int partition = JcppPartitionCalculator.getPartition(pileCode);
|
||||
String queueName = "jcpp.uplink.partition." + partition;
|
||||
|
||||
// 构建消息
|
||||
JcppUplinkMessage message = new JcppUplinkMessage();
|
||||
message.setPileCode(pileCode);
|
||||
message.setMessageType("LOGIN");
|
||||
message.setData("{...}");
|
||||
|
||||
// 发送到指定队列
|
||||
rabbitTemplate.convertAndSend("jcpp.uplink.exchange", queueName, message);
|
||||
```
|
||||
|
||||
### 2. Web 项目消费消息
|
||||
|
||||
```java
|
||||
@Component
|
||||
public class JcppJsonPartitionConsumer {
|
||||
|
||||
@RabbitListener(
|
||||
queues = {
|
||||
"jcpp.uplink.partition.0",
|
||||
"jcpp.uplink.partition.1",
|
||||
// ... 其他分区
|
||||
},
|
||||
concurrency = "1" // 单线程消费保证顺序
|
||||
)
|
||||
public void consumeMessage(JcppUplinkMessage uplinkMessage) {
|
||||
// 处理消息
|
||||
messageHandler.handleUplinkMessage(uplinkMessage);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3. 消息处理
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class JcppJsonMessageHandlerImpl implements IJcppJsonMessageHandler {
|
||||
|
||||
@Override
|
||||
public void handleUplinkMessage(JcppUplinkMessage message) {
|
||||
// 根据消息类型分发处理
|
||||
switch (message.getMessageType()) {
|
||||
case "LOGIN":
|
||||
handleLogin(message);
|
||||
break;
|
||||
case "HEARTBEAT":
|
||||
handleHeartbeat(message);
|
||||
break;
|
||||
// ... 其他消息类型
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 分区算法
|
||||
|
||||
### Hash 计算
|
||||
|
||||
使用 MurmurHash3_128 算法计算分区:
|
||||
|
||||
```java
|
||||
public static int getPartition(String pileCode) {
|
||||
// 使用 MurmurHash3_128 算法
|
||||
long hash = Hashing.murmur3_128()
|
||||
.hashString(pileCode, StandardCharsets.UTF_8)
|
||||
.asLong();
|
||||
|
||||
// 取绝对值并对分区数取模
|
||||
return Math.abs((int) (hash % partitionCount));
|
||||
}
|
||||
```
|
||||
|
||||
### 分区示例
|
||||
|
||||
假设 `partitionCount = 10`:
|
||||
|
||||
| pileCode | Hash 值 | 分区编号 |
|
||||
|----------|---------|---------|
|
||||
| 20231212000010 | 123456789 | 9 |
|
||||
| 20231212000011 | 987654321 | 1 |
|
||||
| 20231212000012 | 456789123 | 3 |
|
||||
|
||||
同一个 pileCode 的所有消息都会路由到同一个分区,由单线程顺序处理。
|
||||
|
||||
## 与原有方案的对比
|
||||
|
||||
### 原有方案(无分区)
|
||||
|
||||
- ❌ 多个消费者并发消费,可能导致消息乱序
|
||||
- ❌ 同一充电桩的消息可能被不同消费者处理
|
||||
- ✅ 实现简单,无需额外配置
|
||||
|
||||
### 新方案(分区消费)
|
||||
|
||||
- ✅ 同一充电桩的消息保证顺序处理
|
||||
- ✅ 每个分区单线程消费,避免并发问题
|
||||
- ✅ 支持水平扩展(增加分区数量)
|
||||
- ⚠️ 需要 JCPP 端配合实现分区路由
|
||||
|
||||
## 监控与运维
|
||||
|
||||
### 1. 日志监控
|
||||
|
||||
```
|
||||
[分区0] 收到消息: pileCode=20231212000010, messageType=LOGIN
|
||||
[分区0] 消息处理完成: pileCode=20231212000010
|
||||
```
|
||||
|
||||
### 2. 性能指标
|
||||
|
||||
- 消息处理延迟
|
||||
- 队列堆积数量
|
||||
- 消费速率
|
||||
- 错误率
|
||||
|
||||
### 3. 告警规则
|
||||
|
||||
- 队列堆积超过阈值
|
||||
- 消息处理失败率超过阈值
|
||||
- 消费延迟超过阈值
|
||||
|
||||
## 注意事项
|
||||
|
||||
### 1. 分区数量
|
||||
|
||||
- 分区数量必须与 JCPP 保持一致
|
||||
- 修改分区数量需要重新创建队列
|
||||
- 建议在系统初始化时确定分区数量,后续不要修改
|
||||
|
||||
### 2. 消息顺序
|
||||
|
||||
- 每个分区单线程消费(`concurrency=1`)
|
||||
- 同一 pileCode 的消息保证顺序
|
||||
- 不同 pileCode 的消息可能并发处理
|
||||
|
||||
### 3. 异常处理
|
||||
|
||||
- JSON 解析失败:记录日志并抛出异常
|
||||
- 业务处理失败:根据业务需求决定是否重试
|
||||
- 建议配置死信队列处理失败消息
|
||||
|
||||
### 4. 性能优化
|
||||
|
||||
- 合理设置分区数量(建议 10-20)
|
||||
- 避免在消息处理中执行耗时操作
|
||||
- 使用 Redis 缓存减少数据库压力
|
||||
|
||||
## 故障排查
|
||||
|
||||
### 1. 消息无法消费
|
||||
|
||||
- 检查队列是否创建成功
|
||||
- 检查队列绑定是否正确
|
||||
- 检查 RabbitMQ 连接是否正常
|
||||
|
||||
### 2. 消息分区错误
|
||||
|
||||
- 检查 Hash 算法是否与 JCPP 一致
|
||||
- 检查分区数量配置是否一致
|
||||
- 查看日志中的分区不匹配警告
|
||||
|
||||
### 3. 消息处理失败
|
||||
|
||||
- 查看异常堆栈信息
|
||||
- 检查业务逻辑是否正确
|
||||
- 检查数据库连接是否正常
|
||||
|
||||
## 部署步骤
|
||||
|
||||
### 1. 更新配置
|
||||
|
||||
在 `application-{env}.yml` 中添加 JCPP 配置
|
||||
|
||||
### 2. 启动应用
|
||||
|
||||
```bash
|
||||
mvn clean package -DskipTests
|
||||
java -jar jsowell-admin/target/jsowell-admin.jar --spring.profiles.active=sit
|
||||
```
|
||||
|
||||
### 3. 验证队列创建
|
||||
|
||||
登录 RabbitMQ 管理界面,检查是否创建了 10 个分区队列
|
||||
|
||||
### 4. 发送测试消息
|
||||
|
||||
使用 JCPP 发送测试消息,验证分区路由是否正确
|
||||
|
||||
### 5. 监控运行状态
|
||||
|
||||
查看日志,确认消息正常消费
|
||||
|
||||
## 参考资料
|
||||
|
||||
- [Spring AMQP 文档](https://docs.spring.io/spring-amqp/reference/)
|
||||
- [RabbitMQ 分区队列](https://www.rabbitmq.com/partitions.html)
|
||||
- [Guava Hashing](https://github.com/google/guava/wiki/HashingExplained)
|
||||
|
||||
---
|
||||
|
||||
**文档版本**:v2.1(JSON 格式)
|
||||
**最后更新**:2025-12-30
|
||||
**维护人员**:jsowell 团队
|
||||
Reference in New Issue
Block a user