22 KiB
JCPP 充电桩平台对接进度文档
文档创建时间:2025-12-30 最后更新时间:2025-12-30 对接平台:JCPP(Java Charging Pile Platform)
一、项目概述
1.1 对接目标
将万车充运营管理平台与 JCPP 充电桩平台进行对接,实现充电桩设备的统一管理和充电业务的互联互通。
1.2 技术架构
- 通信方式:RabbitMQ 消息队列
- 消息格式:JSON
- 协议版本:yunkuaichongV150
- Exchange:
jcpp.uplink.exchange(上行消息)、jcpp.downlink.exchange(下行消息)
1.3 模块位置
- 核心代码:
jsowell-pile模块 - 包路径:
com.jsowell.pile.jcpp - 配置类:
JcppConfig、JcppRabbitConfig
二、已完成功能
2.1 基础架构搭建 ✅
2.1.1 RabbitMQ 配置
- 创建
JcppRabbitConfig配置类 - 定义上行消息 Exchange:
jcpp.uplink.exchange - 定义 9 个消息队列和对应的 Routing Key
- 配置队列绑定关系
队列列表:
| 队列名称 | Routing Key | 用途 |
|---|---|---|
jcpp.uplink.login |
jcpp.uplink.login |
充电桩登录 |
jcpp.uplink.heartbeat |
jcpp.uplink.heartbeat |
心跳消息 |
jcpp.uplink.startCharge |
jcpp.uplink.startCharge |
刷卡启动充电 |
jcpp.uplink.realTimeData |
jcpp.uplink.realTimeData |
实时充电数据 |
jcpp.uplink.transaction |
jcpp.uplink.transaction |
交易记录 |
jcpp.uplink.gunStatus |
jcpp.uplink.gunStatus |
枪状态上报 |
jcpp.uplink.pricing |
jcpp.uplink.pricing.# |
计费模板查询 |
jcpp.uplink.remoteResult |
jcpp.uplink.remoteResult.# |
远程操作结果 |
jcpp.uplink.sessionClose |
jcpp.uplink.sessionClose |
会话关闭 |
2.1.2 DTO 数据传输对象
JcppUplinkMessage:上行消息统一封装JcppDownlinkRequest:下行请求封装JcppDownlinkCommand:下行命令封装JcppLoginData:登录数据JcppStartChargeData:启动充电数据JcppRealTimeData:实时数据JcppTransactionData:交易记录数据JcppPricingModel:计费模板数据JcppRemoteStartResultData:远程启动结果数据JcppSessionInfo:会话信息
2.1.3 常量定义
JcppConstants:定义队列名称、Routing Key、消息类型等常量
2.2 上行消息处理(JCPP → 万车充)✅
2.2.1 充电桩登录 ✅
消费者:JcppLoginConsumer
队列:jcpp.uplink.login
功能:
- 接收充电桩登录消息
- 验证充电桩是否存在于系统中
- 发送登录应答(通过
IJcppDownlinkService.sendLoginAck()) - 更新充电桩在线状态(已注释,待确认业务逻辑)
状态:✅ 已完成基础功能
2.2.2 心跳消息 ✅
消费者:JcppHeartbeatConsumer
队列:jcpp.uplink.heartbeat
功能:
- 接收充电桩心跳消息
- 更新充电桩最后活跃时间到 Redis(避免频繁写数据库)
- 设置 Redis 过期时间为 180 秒
状态:✅ 已完成
2.2.3 刷卡启动充电 ✅
消费者:JcppStartChargeConsumer
队列:jcpp.uplink.startCharge
功能:
- 接收刷卡启动充电请求
- 验证充电桩状态
- 查询授权卡信息(
PileAuthCard) - 验证卡状态和密码(密码验证已注释)
- 查询会员钱包余额
- 检查白名单(免费充电)
- 验证余额是否充足
- 生成交易流水号(
tradeNo) - 创建充电订单(
OrderBasicInfo) - 发送鉴权结果应答
状态:✅ 已完成核心功能
2.2.4 实时充电数据 ✅
消费者:JcppRealTimeDataConsumer
队列:jcpp.uplink.realTimeData
功能:
- 接收实时充电数据(电压、电流、SOC、充电量、充电金额等)
- 将实时数据缓存到 Redis(5 分钟过期)
- 根据
tradeNo查询订单 - 更新订单实时数据(充电量、金额、时长)
- 更新枪状态为充电中
状态:✅ 已完成
2.2.5 交易记录(订单结算)✅
消费者:JcppTransactionConsumer
队列:jcpp.uplink.transaction
功能:
- 接收充电完成的交易记录
- 根据
tradeNo查询订单 - 幂等性检查(避免重复处理)
- 更新订单状态为"充电完成"
- 更新订单充电数据(开始时间、结束时间、充电量、金额、停止原因)
- 更新枪状态为空闲
- 发送交易记录应答
- 触发结算流程(已注释,待实现)
状态:✅ 基础功能已完成,⚠️ 结算流程待对接
2.2.6 枪状态上报 ✅
消费者:JcppGunStatusConsumer
队列:jcpp.uplink.gunStatus
功能:
- 接收充电枪状态变化消息
- 状态映射(JCPP 状态 → 系统状态)
IDLE→1(空闲)INSERTED→2(占用未充电)CHARGING→3(充电中)CHARGE_COMPLETE→2(充电完成未拔枪)FAULT→255(故障)UNKNOWN→0(离网)
- 更新枪状态到数据库
- 记录故障信息(日志)
状态:✅ 已完成
2.2.7 计费模板查询 ✅
消费者:JcppPricingConsumer
队列:jcpp.uplink.pricing
功能:
- 处理计费模板查询请求(
QUERY_PRICING) - 根据充电桩编码查询关联的计费模板
- 将计费模板转换为 JCPP 格式(
PricingModelConverter) - 发送计费模板查询应答
- 处理计费模板校验请求(
VERIFY_PRICING) - 发送计费模板校验应答
状态:✅ 已完成
2.2.8 远程操作结果 ✅
消费者:JcppRemoteResultConsumer
队列:jcpp.uplink.remoteResult
功能:
- 处理远程启动结果(
REMOTE_START_RESULT)- 启动成功:更新订单状态为"充电中",更新枪状态
- 启动失败:更新订单状态为"已取消",记录失败原因
- 处理远程停止结果(
REMOTE_STOP_RESULT)- 记录停止结果日志
- 等待交易记录消息进行最终结算
- 启动失败时触发退款流程(已注释,待实现)
状态:✅ 基础功能已完成,⚠️ 退款流程待实现
2.2.9 会话关闭 ✅
消费者:JcppSessionCloseConsumer
队列:jcpp.uplink.sessionClose
功能:
- 接收充电桩会话关闭消息
- 更新充电桩离线状态(已注释)
- 更新所有枪状态为离线
- 查询正在充电的订单并标记为异常(待实现)
状态:✅ 基础功能已完成,⚠️ 异常订单处理待实现
2.3 下行消息发送(万车充 → JCPP)✅
2.3.1 下行服务接口
服务接口:IJcppDownlinkService
实现类:JcppDownlinkServiceImpl
已实现方法:
sendLoginAck():发送登录应答sendStartChargeAck():发送启动充电鉴权应答sendTransactionRecordAck():发送交易记录应答sendQueryPricingAck():发送计费模板查询应答sendVerifyPricingAck():发送计费模板校验应答sendRemoteStart():发送远程启动充电命令sendRemoteStop():发送远程停止充电命令
状态:✅ 已完成核心下行命令
2.4 远程充电控制 ✅
2.4.1 远程充电服务
服务接口:IJcppRemoteChargeService
实现类:JcppRemoteChargeServiceImpl
Controller:JcppRemoteChargeController(路径:/jcpp/remote)
功能:
- 远程启动充电(
POST /jcpp/remote/start)- 验证充电桩和枪状态
- 验证会员余额
- 创建充电订单
- 发送远程启动命令到 JCPP
- 远程停止充电(
POST /jcpp/remote/stop)- 查询订单状态
- 发送远程停止命令到 JCPP
状态:✅ 已完成
2.5 工具类 ✅
2.5.1 计费模板转换器
类名:PricingModelConverter
功能:
- 将系统计费模板(
PileBillingTemplate)转换为 JCPP 格式(JcppPricingModel) - 支持分时电价和服务费配置
- 单元测试:
PricingModelConverterTest
状态:✅ 已完成并测试
三、已修复的问题
3.1 消息反序列化错误(第一次修复)✅
问题描述: 消费 JCPP 发来的消息时报错:
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)
原因分析:
JCPP 发送的消息中,data 字段是一个 JSON 字符串,而不是 JSON 对象。但在 JcppUplinkMessage 中定义为 Object 类型,导致 Jackson 反序列化时尝试将 JSON 字符串解析为对象,从而失败。
解决方案:
- 修改
JcppUplinkMessage.data字段类型从Object改为String - 修改所有消费者中的数据解析逻辑:
- 原代码:
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData())); - 修改后:
JSONObject data = JSON.parseObject(uplinkMessage.getData());
- 原代码:
修复时间:2025-12-30 状态:✅ 已修复
3.2 RabbitMQ 消息转换器冲突(第二次修复)✅
问题描述: 修复第一个问题后,仍然出现反序列化错误:
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)
at [Source: (String)"{"pileCode":"20231212000010","messageType":"LOGIN","data":"{\n \"messageIdMSB\": ...
原因分析:
消费者方法签名是 public void handleLogin(String message),但 RabbitMQ 配置了 Jackson2JsonMessageConverter。当 RabbitMQ 接收到 JSON 对象消息时:
Jackson2JsonMessageConverter尝试将 JSON 对象反序列化为String类型- Jackson 无法将 JSON 对象转换为 String,导致
MismatchedInputException
根本原因:
- 消费者期望接收
String类型参数 - RabbitMQ 配置了
Jackson2JsonMessageConverter,会自动进行 JSON 反序列化 - 两者冲突导致反序列化失败
解决方案:
将所有消费者的方法参数从 String message 改为 JcppUplinkMessage uplinkMessage,让 Jackson 自动反序列化为对象。
修改内容:
-
修改所有消费者的方法签名:
- 原代码:
public void handleXxx(String message) - 修改后:
public void handleXxx(JcppUplinkMessage uplinkMessage)
- 原代码:
-
简化消息解析逻辑:
- 删除:
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class); - 直接使用:
uplinkMessage参数
- 删除:
-
优化日志输出:
- 原代码:
log.info("收到 JCPP 登录消息: {}", message); - 修改后:
log.info("收到 JCPP 登录消息: pileCode={}, messageType={}", uplinkMessage.getPileCode(), uplinkMessage.getMessageType());
- 原代码:
修改文件:
JcppLoginConsumer.java:修改方法签名和日志JcppHeartbeatConsumer.java:修改方法签名和日志JcppStartChargeConsumer.java:修改方法签名和日志JcppRealTimeDataConsumer.java:修改方法签名和日志JcppTransactionConsumer.java:修改方法签名和日志JcppGunStatusConsumer.java:修改方法签名和日志JcppPricingConsumer.java:修改方法签名和日志JcppRemoteResultConsumer.java:修改方法签名和日志JcppSessionCloseConsumer.java:修改方法签名和日志
优势:
- 代码更简洁:不需要手动解析 JSON 字符串
- 性能更好:减少一次 JSON 解析操作
- 类型安全:编译时就能发现类型错误
- 符合 Spring AMQP 最佳实践:利用消息转换器自动反序列化
修复时间:2025-12-30 状态:✅ 已修复
四、待完成功能
4.1 订单结算流程 ⚠️
优先级:高
描述:
在 JcppTransactionConsumer 中,交易记录处理完成后需要触发订单结算流程,包括:
- 计算实际费用(电费 + 服务费)
- 扣除会员余额或赠金
- 执行分账(商户、平台、站点等)
- 发放积分奖励
- 生成清算账单
待实现:
// TODO: 触发结算流程
// orderBasicInfoService.realTimeOrderSplit(order.getId());
相关类:
OrderBasicInfoServiceImpl.realTimeOrderSplit()StationSplitConfigServiceOrderSplitRecordServicePointsRewardProducer
4.2 远程启动失败退款 ⚠️
优先级:中
描述:
在 JcppRemoteResultConsumer 中,如果远程启动失败且用户已预付费,需要触发退款流程。
待实现:
// TODO: 如果已预付费,触发退款流程
相关类:
MemberWalletInfoServiceMemberWalletLogService- 第三方支付退款接口(微信、支付宝)
4.3 会话关闭异常订单处理 ⚠️
优先级:中
描述:
在 JcppSessionCloseConsumer 中,当充电桩会话关闭时,需要查询是否有正在充电的订单,如果有则标记为异常结束。
待实现:
// TODO: 查询是否有正在充电的订单,如果有则标记为异常
// 可以调用 OrderBasicInfoService 查询并处理
相关类:
OrderBasicInfoServiceOrderAbnormalRecordService
4.4 充电桩在线状态管理 ⚠️
优先级:低 描述: 目前在登录和会话关闭时更新充电桩在线状态的代码已注释,需要确认业务逻辑后启用。
待确认:
- 是否需要实时更新充电桩在线状态到数据库?
- 还是仅通过 Redis 心跳判断在线状态?
相关代码:
JcppLoginConsumer.java:56-59(已注释)JcppSessionCloseConsumer.java:59(已注释)
4.5 枪故障信息持久化 ⚠️
优先级:低
描述:
在 JcppGunStatusConsumer 中,当接收到枪故障信息时,目前仅记录日志,需要将故障信息保存到数据库或发送告警。
待实现:
// TODO: 可以将故障信息保存到数据库或发送告警
相关表:
pile_connector_info(可能需要添加故障信息字段)- 或创建新表
pile_connector_fault_record
4.6 消息服务接口 ⚠️
优先级:低
描述:
IJcppMessageService 和 JcppMessageServiceImpl 已创建但功能未完善,可能用于消息查询、重发等功能。
待实现:
- 消息历史查询
- 消息重发机制
- 消息统计分析
五、测试情况
5.1 单元测试
PricingModelConverterTest:计费模板转换器测试- 其他消费者单元测试(待补充)
5.2 集成测试
- 充电桩登录流程测试
- 刷卡启动充电流程测试
- 远程启动充电流程测试
- 充电过程实时数据测试
- 充电完成结算流程测试
- 异常场景测试(网络断开、超时等)
六、部署配置
6.1 RabbitMQ 配置
配置文件:application-{env}.yml
spring:
rabbitmq:
host: ${RABBITMQ_HOST}
port: 5672
username: ${RABBITMQ_USERNAME}
password: ${RABBITMQ_PASSWORD}
virtual-host: /
listener:
simple:
acknowledge-mode: auto # 自动应答模式
concurrency: 1
max-concurrency: 20
prefetch: 1
retry:
enabled: true
max-attempts: 3
initial-interval: 1s
max-interval: 10s
multiplier: 2.0
6.2 JCPP 配置
配置类:JcppConfig
@Configuration
public class JcppConfig {
// JCPP 平台相关配置
// 如:API 地址、认证信息等
}
七、注意事项
7.1 消息幂等性
- 所有消费者都需要做幂等性检查,避免重复处理
- 目前
JcppTransactionConsumer已实现幂等性检查 - 其他消费者需要根据业务场景补充
7.2 事务管理
- 涉及数据库操作的消费者需要添加
@Transactional注解 - 目前
JcppStartChargeConsumer、JcppTransactionConsumer、JcppRemoteResultConsumer已添加
7.3 异常处理
- 所有消费者都需要捕获异常并记录日志
- 避免异常导致消息丢失或重复消费
7.4 性能优化
- 实时数据和心跳消息使用 Redis 缓存,避免频繁写数据库
- 考虑使用批量更新减少数据库压力
八、后续计划
8.1 短期计划(1-2 周)
- 完成订单结算流程对接
- 实现远程启动失败退款功能
- 完善会话关闭异常订单处理
- 补充单元测试和集成测试
8.2 中期计划(1 个月)
- 优化消息处理性能
- 完善监控和告警机制
- 补充故障信息持久化功能
- 完善消息服务接口
8.3 长期计划(3 个月)
- 支持更多 JCPP 协议功能
- 实现消息重发和补偿机制
- 优化数据统计和报表功能
- 完善运维工具和管理界面
九、联系人
- 开发负责人:待补充
- 测试负责人:待补充
- 运维负责人:待补充
十、变更记录
| 日期 | 版本 | 修改人 | 修改内容 |
|---|---|---|---|
| 2025-12-30 | v1.0 | Claude | 创建文档,记录当前对接进度 |
| 2025-12-30 | v1.1 | Claude | 修复消息反序列化错误(第一次) |
| 2025-12-30 | v1.2 | Claude | 修复 RabbitMQ 消息转换器冲突(第二次) |
| 2025-12-30 | v2.1 | Claude | 实现 JSON 消息分区消费方案 |
十一、JSON 消息分区消费方案(v2.1 新增)
11.1 方案概述
为了确保同一充电桩的消息按顺序处理,实现了基于 JSON 格式的分区消费方案:
- 消息格式:保持原有 JSON 格式,无需 Protobuf
- 分区策略:基于 pileCode 的 Hash 分区
- 分区数量:10 个分区(可配置)
- Hash 算法:MurmurHash3_128(与 JCPP 保持一致)
- 消费模式:每个分区单线程消费,保证顺序
11.2 核心组件
11.2.1 分区计算器
类名:JcppPartitionCalculator
功能:
- 使用 MurmurHash3_128 算法计算分区
- 根据 pileCode 计算分区编号(0 到 partitionCount-1)
- 提供队列名称生成方法
核心代码:
public static int getPartition(String pileCode) {
long hash = Hashing.murmur3_128()
.hashString(pileCode, StandardCharsets.UTF_8)
.asLong();
return Math.abs((int) (hash % partitionCount));
}
11.2.2 分区队列配置
类名:JcppPartitionQueueConfig
功能:
- 创建 10 个分区队列(
jcpp.uplink.partition.0~jcpp.uplink.partition.9) - 创建 Topic Exchange(
jcpp.uplink.exchange) - 绑定队列到 Exchange(routing key:
jcpp.uplink.#)
11.2.3 JSON 消息消费者
类名:JcppJsonPartitionConsumer
功能:
- 监听所有分区队列
- 单线程消费(
concurrency=1)保证顺序 - 自动反序列化 JSON 消息为
JcppUplinkMessage对象 - 验证分区是否正确
- 调用消息处理器处理消息
11.2.4 消息处理器
接口:IJcppJsonMessageHandler
实现类:JcppJsonMessageHandlerImpl
功能:
- 根据消息类型分发处理
- 整合原有所有消费者的处理逻辑
- 处理登录、心跳、枪状态、充电进度、交易记录、刷卡启动、计费查询、会话关闭、远程操作结果等消息
- 复用原有业务逻辑(
PileBasicInfoService、OrderBasicInfoService等)
11.3 技术栈
- Guava:33.0.0-jre(提供 MurmurHash3_128)
- Spring AMQP:2.5.14
- RabbitMQ:5672
- FastJSON2:2.0.23
11.4 配置说明
配置文件:application-{env}.yml
# JCPP 配置
jcpp:
rabbitmq:
# 分区数量(与 JCPP 保持一致)
partition-count: 10
# Exchange 名称
exchange: jcpp.uplink.exchange
# 队列前缀
queue-prefix: jcpp.uplink.partition
11.5 消息格式
{
"pileCode": "20231212000010",
"messageType": "LOGIN",
"data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}"
}
字段说明:
pileCode:充电桩编码(用于分区计算)messageType:消息类型data:具体消息内容(JSON 字符串)
11.6 优势
- 消息顺序保证:同一充电桩的消息按顺序处理
- 简单易用:保持原有 JSON 格式,无需 Protobuf
- 统一处理:整合所有消息类型的处理逻辑到一个处理器
- 易于调试:JSON 格式可读性强,便于排查问题
- 可扩展性:支持动态调整分区数量
11.7 与原有方案的对比
| 特性 | 原有方案(无分区) | 新方案(分区消费) |
|---|---|---|
| 消息格式 | JSON 字符串 | JSON 字符串 |
| 消息顺序 | 不保证 | 保证同一 pileCode 顺序 |
| 分区消费 | 不支持 | 支持 |
| 并发处理 | 多消费者并发 | 每个分区单线程 |
| 代码复杂度 | 多个独立消费者 | 统一消息处理器 |
| 易于调试 | 较难 | 较易 |
11.8 部署步骤
- 更新配置:在
application-{env}.yml中添加 JCPP 配置 - 启动应用:应用会自动创建 10 个分区队列
- JCPP 端配置:确保 JCPP 使用相同的 Hash 算法和分区数量
- 验证测试:发送测试消息,验证分区路由和消息处理
11.9 注意事项
- 分区数量:必须与 JCPP 保持一致,建议在系统初始化时确定,后续不要修改
- Hash 算法:必须使用 MurmurHash3_128,与 JCPP 保持一致
- 消息顺序:每个分区单线程消费(
concurrency=1),不要修改此配置 - 异常处理:JSON 解析失败会抛出异常,建议配置死信队列
- 性能监控:监控队列堆积、消费延迟、错误率等指标
11.10 相关文档
详细使用说明请参考:JCPP JSON 消息分区消费方案