mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-04-19 18:45:03 +08:00
711 lines
22 KiB
Markdown
711 lines
22 KiB
Markdown
# JCPP 充电桩平台对接进度文档
|
||
|
||
> 文档创建时间:2025-12-30
|
||
> 最后更新时间:2025-12-30
|
||
> 对接平台:JCPP(Java Charging Pile Platform)
|
||
|
||
---
|
||
|
||
## 一、项目概述
|
||
|
||
### 1.1 对接目标
|
||
将万车充运营管理平台与 JCPP 充电桩平台进行对接,实现充电桩设备的统一管理和充电业务的互联互通。
|
||
|
||
### 1.2 技术架构
|
||
- **通信方式**:RabbitMQ 消息队列
|
||
- **消息格式**:JSON
|
||
- **协议版本**:yunkuaichongV150
|
||
- **Exchange**:`jcpp.uplink.exchange`(上行消息)、`jcpp.downlink.exchange`(下行消息)
|
||
|
||
### 1.3 模块位置
|
||
- **核心代码**:`jsowell-pile` 模块
|
||
- **包路径**:`com.jsowell.pile.jcpp`
|
||
- **配置类**:`JcppConfig`、`JcppRabbitConfig`
|
||
|
||
---
|
||
|
||
## 二、已完成功能
|
||
|
||
### 2.1 基础架构搭建 ✅
|
||
|
||
#### 2.1.1 RabbitMQ 配置
|
||
- [x] 创建 `JcppRabbitConfig` 配置类
|
||
- [x] 定义上行消息 Exchange:`jcpp.uplink.exchange`
|
||
- [x] 定义 9 个消息队列和对应的 Routing Key
|
||
- [x] 配置队列绑定关系
|
||
|
||
**队列列表**:
|
||
| 队列名称 | Routing Key | 用途 |
|
||
|---------|-------------|------|
|
||
| `jcpp.uplink.login` | `jcpp.uplink.login` | 充电桩登录 |
|
||
| `jcpp.uplink.heartbeat` | `jcpp.uplink.heartbeat` | 心跳消息 |
|
||
| `jcpp.uplink.startCharge` | `jcpp.uplink.startCharge` | 刷卡启动充电 |
|
||
| `jcpp.uplink.realTimeData` | `jcpp.uplink.realTimeData` | 实时充电数据 |
|
||
| `jcpp.uplink.transaction` | `jcpp.uplink.transaction` | 交易记录 |
|
||
| `jcpp.uplink.gunStatus` | `jcpp.uplink.gunStatus` | 枪状态上报 |
|
||
| `jcpp.uplink.pricing` | `jcpp.uplink.pricing.#` | 计费模板查询 |
|
||
| `jcpp.uplink.remoteResult` | `jcpp.uplink.remoteResult.#` | 远程操作结果 |
|
||
| `jcpp.uplink.sessionClose` | `jcpp.uplink.sessionClose` | 会话关闭 |
|
||
|
||
#### 2.1.2 DTO 数据传输对象
|
||
- [x] `JcppUplinkMessage`:上行消息统一封装
|
||
- [x] `JcppDownlinkRequest`:下行请求封装
|
||
- [x] `JcppDownlinkCommand`:下行命令封装
|
||
- [x] `JcppLoginData`:登录数据
|
||
- [x] `JcppStartChargeData`:启动充电数据
|
||
- [x] `JcppRealTimeData`:实时数据
|
||
- [x] `JcppTransactionData`:交易记录数据
|
||
- [x] `JcppPricingModel`:计费模板数据
|
||
- [x] `JcppRemoteStartResultData`:远程启动结果数据
|
||
- [x] `JcppSessionInfo`:会话信息
|
||
|
||
#### 2.1.3 常量定义
|
||
- [x] `JcppConstants`:定义队列名称、Routing Key、消息类型等常量
|
||
|
||
---
|
||
|
||
### 2.2 上行消息处理(JCPP → 万车充)✅
|
||
|
||
#### 2.2.1 充电桩登录 ✅
|
||
**消费者**:`JcppLoginConsumer`
|
||
**队列**:`jcpp.uplink.login`
|
||
**功能**:
|
||
- [x] 接收充电桩登录消息
|
||
- [x] 验证充电桩是否存在于系统中
|
||
- [x] 发送登录应答(通过 `IJcppDownlinkService.sendLoginAck()`)
|
||
- [x] 更新充电桩在线状态(已注释,待确认业务逻辑)
|
||
|
||
**状态**:✅ 已完成基础功能
|
||
|
||
---
|
||
|
||
#### 2.2.2 心跳消息 ✅
|
||
**消费者**:`JcppHeartbeatConsumer`
|
||
**队列**:`jcpp.uplink.heartbeat`
|
||
**功能**:
|
||
- [x] 接收充电桩心跳消息
|
||
- [x] 更新充电桩最后活跃时间到 Redis(避免频繁写数据库)
|
||
- [x] 设置 Redis 过期时间为 180 秒
|
||
|
||
**状态**:✅ 已完成
|
||
|
||
---
|
||
|
||
#### 2.2.3 刷卡启动充电 ✅
|
||
**消费者**:`JcppStartChargeConsumer`
|
||
**队列**:`jcpp.uplink.startCharge`
|
||
**功能**:
|
||
- [x] 接收刷卡启动充电请求
|
||
- [x] 验证充电桩状态
|
||
- [x] 查询授权卡信息(`PileAuthCard`)
|
||
- [x] 验证卡状态和密码(密码验证已注释)
|
||
- [x] 查询会员钱包余额
|
||
- [x] 检查白名单(免费充电)
|
||
- [x] 验证余额是否充足
|
||
- [x] 生成交易流水号(`tradeNo`)
|
||
- [x] 创建充电订单(`OrderBasicInfo`)
|
||
- [x] 发送鉴权结果应答
|
||
|
||
**状态**:✅ 已完成核心功能
|
||
|
||
---
|
||
|
||
#### 2.2.4 实时充电数据 ✅
|
||
**消费者**:`JcppRealTimeDataConsumer`
|
||
**队列**:`jcpp.uplink.realTimeData`
|
||
**功能**:
|
||
- [x] 接收实时充电数据(电压、电流、SOC、充电量、充电金额等)
|
||
- [x] 将实时数据缓存到 Redis(5 分钟过期)
|
||
- [x] 根据 `tradeNo` 查询订单
|
||
- [x] 更新订单实时数据(充电量、金额、时长)
|
||
- [x] 更新枪状态为充电中
|
||
|
||
**状态**:✅ 已完成
|
||
|
||
---
|
||
|
||
#### 2.2.5 交易记录(订单结算)✅
|
||
**消费者**:`JcppTransactionConsumer`
|
||
**队列**:`jcpp.uplink.transaction`
|
||
**功能**:
|
||
- [x] 接收充电完成的交易记录
|
||
- [x] 根据 `tradeNo` 查询订单
|
||
- [x] 幂等性检查(避免重复处理)
|
||
- [x] 更新订单状态为"充电完成"
|
||
- [x] 更新订单充电数据(开始时间、结束时间、充电量、金额、停止原因)
|
||
- [x] 更新枪状态为空闲
|
||
- [x] 发送交易记录应答
|
||
- [ ] 触发结算流程(已注释,待实现)
|
||
|
||
**状态**:✅ 基础功能已完成,⚠️ 结算流程待对接
|
||
|
||
---
|
||
|
||
#### 2.2.6 枪状态上报 ✅
|
||
**消费者**:`JcppGunStatusConsumer`
|
||
**队列**:`jcpp.uplink.gunStatus`
|
||
**功能**:
|
||
- [x] 接收充电枪状态变化消息
|
||
- [x] 状态映射(JCPP 状态 → 系统状态)
|
||
- `IDLE` → `1`(空闲)
|
||
- `INSERTED` → `2`(占用未充电)
|
||
- `CHARGING` → `3`(充电中)
|
||
- `CHARGE_COMPLETE` → `2`(充电完成未拔枪)
|
||
- `FAULT` → `255`(故障)
|
||
- `UNKNOWN` → `0`(离网)
|
||
- [x] 更新枪状态到数据库
|
||
- [x] 记录故障信息(日志)
|
||
|
||
**状态**:✅ 已完成
|
||
|
||
---
|
||
|
||
#### 2.2.7 计费模板查询 ✅
|
||
**消费者**:`JcppPricingConsumer`
|
||
**队列**:`jcpp.uplink.pricing`
|
||
**功能**:
|
||
- [x] 处理计费模板查询请求(`QUERY_PRICING`)
|
||
- [x] 根据充电桩编码查询关联的计费模板
|
||
- [x] 将计费模板转换为 JCPP 格式(`PricingModelConverter`)
|
||
- [x] 发送计费模板查询应答
|
||
- [x] 处理计费模板校验请求(`VERIFY_PRICING`)
|
||
- [x] 发送计费模板校验应答
|
||
|
||
**状态**:✅ 已完成
|
||
|
||
---
|
||
|
||
#### 2.2.8 远程操作结果 ✅
|
||
**消费者**:`JcppRemoteResultConsumer`
|
||
**队列**:`jcpp.uplink.remoteResult`
|
||
**功能**:
|
||
- [x] 处理远程启动结果(`REMOTE_START_RESULT`)
|
||
- 启动成功:更新订单状态为"充电中",更新枪状态
|
||
- 启动失败:更新订单状态为"已取消",记录失败原因
|
||
- [x] 处理远程停止结果(`REMOTE_STOP_RESULT`)
|
||
- 记录停止结果日志
|
||
- 等待交易记录消息进行最终结算
|
||
- [ ] 启动失败时触发退款流程(已注释,待实现)
|
||
|
||
**状态**:✅ 基础功能已完成,⚠️ 退款流程待实现
|
||
|
||
---
|
||
|
||
#### 2.2.9 会话关闭 ✅
|
||
**消费者**:`JcppSessionCloseConsumer`
|
||
**队列**:`jcpp.uplink.sessionClose`
|
||
**功能**:
|
||
- [x] 接收充电桩会话关闭消息
|
||
- [x] 更新充电桩离线状态(已注释)
|
||
- [x] 更新所有枪状态为离线
|
||
- [ ] 查询正在充电的订单并标记为异常(待实现)
|
||
|
||
**状态**:✅ 基础功能已完成,⚠️ 异常订单处理待实现
|
||
|
||
---
|
||
|
||
### 2.3 下行消息发送(万车充 → JCPP)✅
|
||
|
||
#### 2.3.1 下行服务接口
|
||
**服务接口**:`IJcppDownlinkService`
|
||
**实现类**:`JcppDownlinkServiceImpl`
|
||
|
||
**已实现方法**:
|
||
- [x] `sendLoginAck()`:发送登录应答
|
||
- [x] `sendStartChargeAck()`:发送启动充电鉴权应答
|
||
- [x] `sendTransactionRecordAck()`:发送交易记录应答
|
||
- [x] `sendQueryPricingAck()`:发送计费模板查询应答
|
||
- [x] `sendVerifyPricingAck()`:发送计费模板校验应答
|
||
- [x] `sendRemoteStart()`:发送远程启动充电命令
|
||
- [x] `sendRemoteStop()`:发送远程停止充电命令
|
||
|
||
**状态**:✅ 已完成核心下行命令
|
||
|
||
---
|
||
|
||
### 2.4 远程充电控制 ✅
|
||
|
||
#### 2.4.1 远程充电服务
|
||
**服务接口**:`IJcppRemoteChargeService`
|
||
**实现类**:`JcppRemoteChargeServiceImpl`
|
||
**Controller**:`JcppRemoteChargeController`(路径:`/jcpp/remote`)
|
||
|
||
**功能**:
|
||
- [x] 远程启动充电(`POST /jcpp/remote/start`)
|
||
- 验证充电桩和枪状态
|
||
- 验证会员余额
|
||
- 创建充电订单
|
||
- 发送远程启动命令到 JCPP
|
||
- [x] 远程停止充电(`POST /jcpp/remote/stop`)
|
||
- 查询订单状态
|
||
- 发送远程停止命令到 JCPP
|
||
|
||
**状态**:✅ 已完成
|
||
|
||
---
|
||
|
||
### 2.5 工具类 ✅
|
||
|
||
#### 2.5.1 计费模板转换器
|
||
**类名**:`PricingModelConverter`
|
||
**功能**:
|
||
- [x] 将系统计费模板(`PileBillingTemplate`)转换为 JCPP 格式(`JcppPricingModel`)
|
||
- [x] 支持分时电价和服务费配置
|
||
- [x] 单元测试:`PricingModelConverterTest`
|
||
|
||
**状态**:✅ 已完成并测试
|
||
|
||
---
|
||
|
||
## 三、已修复的问题
|
||
|
||
### 3.1 消息反序列化错误(第一次修复)✅
|
||
|
||
**问题描述**:
|
||
消费 JCPP 发来的消息时报错:
|
||
```
|
||
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
|
||
Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)
|
||
```
|
||
|
||
**原因分析**:
|
||
JCPP 发送的消息中,`data` 字段是一个 **JSON 字符串**,而不是 JSON 对象。但在 `JcppUplinkMessage` 中定义为 `Object` 类型,导致 Jackson 反序列化时尝试将 JSON 字符串解析为对象,从而失败。
|
||
|
||
**解决方案**:
|
||
1. 修改 `JcppUplinkMessage.data` 字段类型从 `Object` 改为 `String`
|
||
2. 修改所有消费者中的数据解析逻辑:
|
||
- 原代码:`JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));`
|
||
- 修改后:`JSONObject data = JSON.parseObject(uplinkMessage.getData());`
|
||
|
||
**修复时间**:2025-12-30
|
||
**状态**:✅ 已修复
|
||
|
||
---
|
||
|
||
### 3.2 RabbitMQ 消息转换器冲突(第二次修复)✅
|
||
|
||
**问题描述**:
|
||
修复第一个问题后,仍然出现反序列化错误:
|
||
```
|
||
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
|
||
Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)
|
||
at [Source: (String)"{"pileCode":"20231212000010","messageType":"LOGIN","data":"{\n \"messageIdMSB\": ...
|
||
```
|
||
|
||
**原因分析**:
|
||
消费者方法签名是 `public void handleLogin(String message)`,但 RabbitMQ 配置了 `Jackson2JsonMessageConverter`。当 RabbitMQ 接收到 JSON 对象消息时:
|
||
1. `Jackson2JsonMessageConverter` 尝试将 JSON 对象反序列化为 `String` 类型
|
||
2. Jackson 无法将 JSON 对象转换为 String,导致 `MismatchedInputException`
|
||
|
||
**根本原因**:
|
||
- 消费者期望接收 `String` 类型参数
|
||
- RabbitMQ 配置了 `Jackson2JsonMessageConverter`,会自动进行 JSON 反序列化
|
||
- 两者冲突导致反序列化失败
|
||
|
||
**解决方案**:
|
||
将所有消费者的方法参数从 `String message` 改为 `JcppUplinkMessage uplinkMessage`,让 Jackson 自动反序列化为对象。
|
||
|
||
**修改内容**:
|
||
1. 修改所有消费者的方法签名:
|
||
- 原代码:`public void handleXxx(String message)`
|
||
- 修改后:`public void handleXxx(JcppUplinkMessage uplinkMessage)`
|
||
|
||
2. 简化消息解析逻辑:
|
||
- 删除:`JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);`
|
||
- 直接使用:`uplinkMessage` 参数
|
||
|
||
3. 优化日志输出:
|
||
- 原代码:`log.info("收到 JCPP 登录消息: {}", message);`
|
||
- 修改后:`log.info("收到 JCPP 登录消息: pileCode={}, messageType={}", uplinkMessage.getPileCode(), uplinkMessage.getMessageType());`
|
||
|
||
**修改文件**:
|
||
- [x] `JcppLoginConsumer.java`:修改方法签名和日志
|
||
- [x] `JcppHeartbeatConsumer.java`:修改方法签名和日志
|
||
- [x] `JcppStartChargeConsumer.java`:修改方法签名和日志
|
||
- [x] `JcppRealTimeDataConsumer.java`:修改方法签名和日志
|
||
- [x] `JcppTransactionConsumer.java`:修改方法签名和日志
|
||
- [x] `JcppGunStatusConsumer.java`:修改方法签名和日志
|
||
- [x] `JcppPricingConsumer.java`:修改方法签名和日志
|
||
- [x] `JcppRemoteResultConsumer.java`:修改方法签名和日志
|
||
- [x] `JcppSessionCloseConsumer.java`:修改方法签名和日志
|
||
|
||
**优势**:
|
||
1. 代码更简洁:不需要手动解析 JSON 字符串
|
||
2. 性能更好:减少一次 JSON 解析操作
|
||
3. 类型安全:编译时就能发现类型错误
|
||
4. 符合 Spring AMQP 最佳实践:利用消息转换器自动反序列化
|
||
|
||
**修复时间**:2025-12-30
|
||
**状态**:✅ 已修复
|
||
|
||
---
|
||
|
||
## 四、待完成功能
|
||
|
||
### 4.1 订单结算流程 ⚠️
|
||
|
||
**优先级**:高
|
||
**描述**:
|
||
在 `JcppTransactionConsumer` 中,交易记录处理完成后需要触发订单结算流程,包括:
|
||
- 计算实际费用(电费 + 服务费)
|
||
- 扣除会员余额或赠金
|
||
- 执行分账(商户、平台、站点等)
|
||
- 发放积分奖励
|
||
- 生成清算账单
|
||
|
||
**待实现**:
|
||
```java
|
||
// TODO: 触发结算流程
|
||
// orderBasicInfoService.realTimeOrderSplit(order.getId());
|
||
```
|
||
|
||
**相关类**:
|
||
- `OrderBasicInfoServiceImpl.realTimeOrderSplit()`
|
||
- `StationSplitConfigService`
|
||
- `OrderSplitRecordService`
|
||
- `PointsRewardProducer`
|
||
|
||
---
|
||
|
||
### 4.2 远程启动失败退款 ⚠️
|
||
|
||
**优先级**:中
|
||
**描述**:
|
||
在 `JcppRemoteResultConsumer` 中,如果远程启动失败且用户已预付费,需要触发退款流程。
|
||
|
||
**待实现**:
|
||
```java
|
||
// TODO: 如果已预付费,触发退款流程
|
||
```
|
||
|
||
**相关类**:
|
||
- `MemberWalletInfoService`
|
||
- `MemberWalletLogService`
|
||
- 第三方支付退款接口(微信、支付宝)
|
||
|
||
---
|
||
|
||
### 4.3 会话关闭异常订单处理 ⚠️
|
||
|
||
**优先级**:中
|
||
**描述**:
|
||
在 `JcppSessionCloseConsumer` 中,当充电桩会话关闭时,需要查询是否有正在充电的订单,如果有则标记为异常结束。
|
||
|
||
**待实现**:
|
||
```java
|
||
// TODO: 查询是否有正在充电的订单,如果有则标记为异常
|
||
// 可以调用 OrderBasicInfoService 查询并处理
|
||
```
|
||
|
||
**相关类**:
|
||
- `OrderBasicInfoService`
|
||
- `OrderAbnormalRecordService`
|
||
|
||
---
|
||
|
||
### 4.4 充电桩在线状态管理 ⚠️
|
||
|
||
**优先级**:低
|
||
**描述**:
|
||
目前在登录和会话关闭时更新充电桩在线状态的代码已注释,需要确认业务逻辑后启用。
|
||
|
||
**待确认**:
|
||
- 是否需要实时更新充电桩在线状态到数据库?
|
||
- 还是仅通过 Redis 心跳判断在线状态?
|
||
|
||
**相关代码**:
|
||
- `JcppLoginConsumer.java:56-59`(已注释)
|
||
- `JcppSessionCloseConsumer.java:59`(已注释)
|
||
|
||
---
|
||
|
||
### 4.5 枪故障信息持久化 ⚠️
|
||
|
||
**优先级**:低
|
||
**描述**:
|
||
在 `JcppGunStatusConsumer` 中,当接收到枪故障信息时,目前仅记录日志,需要将故障信息保存到数据库或发送告警。
|
||
|
||
**待实现**:
|
||
```java
|
||
// TODO: 可以将故障信息保存到数据库或发送告警
|
||
```
|
||
|
||
**相关表**:
|
||
- `pile_connector_info`(可能需要添加故障信息字段)
|
||
- 或创建新表 `pile_connector_fault_record`
|
||
|
||
---
|
||
|
||
### 4.6 消息服务接口 ⚠️
|
||
|
||
**优先级**:低
|
||
**描述**:
|
||
`IJcppMessageService` 和 `JcppMessageServiceImpl` 已创建但功能未完善,可能用于消息查询、重发等功能。
|
||
|
||
**待实现**:
|
||
- 消息历史查询
|
||
- 消息重发机制
|
||
- 消息统计分析
|
||
|
||
---
|
||
|
||
## 五、测试情况
|
||
|
||
### 5.1 单元测试
|
||
- [x] `PricingModelConverterTest`:计费模板转换器测试
|
||
- [ ] 其他消费者单元测试(待补充)
|
||
|
||
### 5.2 集成测试
|
||
- [ ] 充电桩登录流程测试
|
||
- [ ] 刷卡启动充电流程测试
|
||
- [ ] 远程启动充电流程测试
|
||
- [ ] 充电过程实时数据测试
|
||
- [ ] 充电完成结算流程测试
|
||
- [ ] 异常场景测试(网络断开、超时等)
|
||
|
||
---
|
||
|
||
## 六、部署配置
|
||
|
||
### 6.1 RabbitMQ 配置
|
||
|
||
**配置文件**:`application-{env}.yml`
|
||
|
||
```yaml
|
||
spring:
|
||
rabbitmq:
|
||
host: ${RABBITMQ_HOST}
|
||
port: 5672
|
||
username: ${RABBITMQ_USERNAME}
|
||
password: ${RABBITMQ_PASSWORD}
|
||
virtual-host: /
|
||
listener:
|
||
simple:
|
||
acknowledge-mode: auto # 自动应答模式
|
||
concurrency: 1
|
||
max-concurrency: 20
|
||
prefetch: 1
|
||
retry:
|
||
enabled: true
|
||
max-attempts: 3
|
||
initial-interval: 1s
|
||
max-interval: 10s
|
||
multiplier: 2.0
|
||
```
|
||
|
||
### 6.2 JCPP 配置
|
||
|
||
**配置类**:`JcppConfig`
|
||
|
||
```java
|
||
@Configuration
|
||
public class JcppConfig {
|
||
// JCPP 平台相关配置
|
||
// 如:API 地址、认证信息等
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 七、注意事项
|
||
|
||
### 7.1 消息幂等性
|
||
- 所有消费者都需要做幂等性检查,避免重复处理
|
||
- 目前 `JcppTransactionConsumer` 已实现幂等性检查
|
||
- 其他消费者需要根据业务场景补充
|
||
|
||
### 7.2 事务管理
|
||
- 涉及数据库操作的消费者需要添加 `@Transactional` 注解
|
||
- 目前 `JcppStartChargeConsumer`、`JcppTransactionConsumer`、`JcppRemoteResultConsumer` 已添加
|
||
|
||
### 7.3 异常处理
|
||
- 所有消费者都需要捕获异常并记录日志
|
||
- 避免异常导致消息丢失或重复消费
|
||
|
||
### 7.4 性能优化
|
||
- 实时数据和心跳消息使用 Redis 缓存,避免频繁写数据库
|
||
- 考虑使用批量更新减少数据库压力
|
||
|
||
---
|
||
|
||
## 八、后续计划
|
||
|
||
### 8.1 短期计划(1-2 周)
|
||
1. 完成订单结算流程对接
|
||
2. 实现远程启动失败退款功能
|
||
3. 完善会话关闭异常订单处理
|
||
4. 补充单元测试和集成测试
|
||
|
||
### 8.2 中期计划(1 个月)
|
||
1. 优化消息处理性能
|
||
2. 完善监控和告警机制
|
||
3. 补充故障信息持久化功能
|
||
4. 完善消息服务接口
|
||
|
||
### 8.3 长期计划(3 个月)
|
||
1. 支持更多 JCPP 协议功能
|
||
2. 实现消息重发和补偿机制
|
||
3. 优化数据统计和报表功能
|
||
4. 完善运维工具和管理界面
|
||
|
||
---
|
||
|
||
## 九、联系人
|
||
|
||
- **开发负责人**:待补充
|
||
- **测试负责人**:待补充
|
||
- **运维负责人**:待补充
|
||
|
||
---
|
||
|
||
## 十、变更记录
|
||
|
||
| 日期 | 版本 | 修改人 | 修改内容 |
|
||
|------|------|--------|----------|
|
||
| 2025-12-30 | v1.0 | Claude | 创建文档,记录当前对接进度 |
|
||
| 2025-12-30 | v1.1 | Claude | 修复消息反序列化错误(第一次) |
|
||
| 2025-12-30 | v1.2 | Claude | 修复 RabbitMQ 消息转换器冲突(第二次) |
|
||
| 2025-12-30 | v2.1 | Claude | 实现 JSON 消息分区消费方案 |
|
||
|
||
---
|
||
|
||
## 十一、JSON 消息分区消费方案(v2.1 新增)
|
||
|
||
### 11.1 方案概述
|
||
|
||
为了确保同一充电桩的消息按顺序处理,实现了基于 JSON 格式的分区消费方案:
|
||
|
||
- **消息格式**:保持原有 JSON 格式,无需 Protobuf
|
||
- **分区策略**:基于 pileCode 的 Hash 分区
|
||
- **分区数量**:10 个分区(可配置)
|
||
- **Hash 算法**:MurmurHash3_128(与 JCPP 保持一致)
|
||
- **消费模式**:每个分区单线程消费,保证顺序
|
||
|
||
### 11.2 核心组件
|
||
|
||
#### 11.2.1 分区计算器
|
||
|
||
**类名**:`JcppPartitionCalculator`
|
||
**功能**:
|
||
- 使用 MurmurHash3_128 算法计算分区
|
||
- 根据 pileCode 计算分区编号(0 到 partitionCount-1)
|
||
- 提供队列名称生成方法
|
||
|
||
**核心代码**:
|
||
```java
|
||
public static int getPartition(String pileCode) {
|
||
long hash = Hashing.murmur3_128()
|
||
.hashString(pileCode, StandardCharsets.UTF_8)
|
||
.asLong();
|
||
return Math.abs((int) (hash % partitionCount));
|
||
}
|
||
```
|
||
|
||
#### 11.2.2 分区队列配置
|
||
|
||
**类名**:`JcppPartitionQueueConfig`
|
||
**功能**:
|
||
- 创建 10 个分区队列(`jcpp.uplink.partition.0` ~ `jcpp.uplink.partition.9`)
|
||
- 创建 Topic Exchange(`jcpp.uplink.exchange`)
|
||
- 绑定队列到 Exchange(routing key: `jcpp.uplink.#`)
|
||
|
||
#### 11.2.3 JSON 消息消费者
|
||
|
||
**类名**:`JcppJsonPartitionConsumer`
|
||
**功能**:
|
||
- 监听所有分区队列
|
||
- 单线程消费(`concurrency=1`)保证顺序
|
||
- 自动反序列化 JSON 消息为 `JcppUplinkMessage` 对象
|
||
- 验证分区是否正确
|
||
- 调用消息处理器处理消息
|
||
|
||
#### 11.2.4 消息处理器
|
||
|
||
**接口**:`IJcppJsonMessageHandler`
|
||
**实现类**:`JcppJsonMessageHandlerImpl`
|
||
**功能**:
|
||
- 根据消息类型分发处理
|
||
- 整合原有所有消费者的处理逻辑
|
||
- 处理登录、心跳、枪状态、充电进度、交易记录、刷卡启动、计费查询、会话关闭、远程操作结果等消息
|
||
- 复用原有业务逻辑(`PileBasicInfoService`、`OrderBasicInfoService` 等)
|
||
|
||
### 11.3 技术栈
|
||
|
||
- **Guava**:33.0.0-jre(提供 MurmurHash3_128)
|
||
- **Spring AMQP**:2.5.14
|
||
- **RabbitMQ**:5672
|
||
- **FastJSON2**:2.0.23
|
||
|
||
### 11.4 配置说明
|
||
|
||
**配置文件**:`application-{env}.yml`
|
||
|
||
```yaml
|
||
# JCPP 配置
|
||
jcpp:
|
||
rabbitmq:
|
||
# 分区数量(与 JCPP 保持一致)
|
||
partition-count: 10
|
||
# Exchange 名称
|
||
exchange: jcpp.uplink.exchange
|
||
# 队列前缀
|
||
queue-prefix: jcpp.uplink.partition
|
||
```
|
||
|
||
### 11.5 消息格式
|
||
|
||
```json
|
||
{
|
||
"pileCode": "20231212000010",
|
||
"messageType": "LOGIN",
|
||
"data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}"
|
||
}
|
||
```
|
||
|
||
**字段说明**:
|
||
- `pileCode`:充电桩编码(用于分区计算)
|
||
- `messageType`:消息类型
|
||
- `data`:具体消息内容(JSON 字符串)
|
||
|
||
### 11.6 优势
|
||
|
||
1. **消息顺序保证**:同一充电桩的消息按顺序处理
|
||
2. **简单易用**:保持原有 JSON 格式,无需 Protobuf
|
||
3. **统一处理**:整合所有消息类型的处理逻辑到一个处理器
|
||
4. **易于调试**:JSON 格式可读性强,便于排查问题
|
||
5. **可扩展性**:支持动态调整分区数量
|
||
|
||
### 11.7 与原有方案的对比
|
||
|
||
| 特性 | 原有方案(无分区) | 新方案(分区消费) |
|
||
|------|------------------|------------------|
|
||
| 消息格式 | JSON 字符串 | JSON 字符串 |
|
||
| 消息顺序 | 不保证 | 保证同一 pileCode 顺序 |
|
||
| 分区消费 | 不支持 | 支持 |
|
||
| 并发处理 | 多消费者并发 | 每个分区单线程 |
|
||
| 代码复杂度 | 多个独立消费者 | 统一消息处理器 |
|
||
| 易于调试 | 较难 | 较易 |
|
||
|
||
### 11.8 部署步骤
|
||
|
||
1. **更新配置**:在 `application-{env}.yml` 中添加 JCPP 配置
|
||
2. **启动应用**:应用会自动创建 10 个分区队列
|
||
3. **JCPP 端配置**:确保 JCPP 使用相同的 Hash 算法和分区数量
|
||
4. **验证测试**:发送测试消息,验证分区路由和消息处理
|
||
|
||
### 11.9 注意事项
|
||
|
||
1. **分区数量**:必须与 JCPP 保持一致,建议在系统初始化时确定,后续不要修改
|
||
2. **Hash 算法**:必须使用 MurmurHash3_128,与 JCPP 保持一致
|
||
3. **消息顺序**:每个分区单线程消费(`concurrency=1`),不要修改此配置
|
||
4. **异常处理**:JSON 解析失败会抛出异常,建议配置死信队列
|
||
5. **性能监控**:监控队列堆积、消费延迟、错误率等指标
|
||
|
||
### 11.10 相关文档
|
||
|
||
详细使用说明请参考:[JCPP JSON 消息分区消费方案](./JCPP_JSON消息分区消费方案.md)
|
||
|
||
---
|
||
|
||
## 十、变更记录
|