# JCPP 充电桩平台对接进度文档 > 文档创建时间:2025-12-30 > 最后更新时间:2025-12-30 > 对接平台:JCPP(Java Charging Pile Platform) --- ## 一、项目概述 ### 1.1 对接目标 将万车充运营管理平台与 JCPP 充电桩平台进行对接,实现充电桩设备的统一管理和充电业务的互联互通。 ### 1.2 技术架构 - **通信方式**:RabbitMQ 消息队列 - **消息格式**:JSON - **协议版本**:yunkuaichongV150 - **Exchange**:`jcpp.uplink.exchange`(上行消息)、`jcpp.downlink.exchange`(下行消息) ### 1.3 模块位置 - **核心代码**:`jsowell-pile` 模块 - **包路径**:`com.jsowell.pile.jcpp` - **配置类**:`JcppConfig`、`JcppRabbitConfig` --- ## 二、已完成功能 ### 2.1 基础架构搭建 ✅ #### 2.1.1 RabbitMQ 配置 - [x] 创建 `JcppRabbitConfig` 配置类 - [x] 定义上行消息 Exchange:`jcpp.uplink.exchange` - [x] 定义 9 个消息队列和对应的 Routing Key - [x] 配置队列绑定关系 **队列列表**: | 队列名称 | Routing Key | 用途 | |---------|-------------|------| | `jcpp.uplink.login` | `jcpp.uplink.login` | 充电桩登录 | | `jcpp.uplink.heartbeat` | `jcpp.uplink.heartbeat` | 心跳消息 | | `jcpp.uplink.startCharge` | `jcpp.uplink.startCharge` | 刷卡启动充电 | | `jcpp.uplink.realTimeData` | `jcpp.uplink.realTimeData` | 实时充电数据 | | `jcpp.uplink.transaction` | `jcpp.uplink.transaction` | 交易记录 | | `jcpp.uplink.gunStatus` | `jcpp.uplink.gunStatus` | 枪状态上报 | | `jcpp.uplink.pricing` | `jcpp.uplink.pricing.#` | 计费模板查询 | | `jcpp.uplink.remoteResult` | `jcpp.uplink.remoteResult.#` | 远程操作结果 | | `jcpp.uplink.sessionClose` | `jcpp.uplink.sessionClose` | 会话关闭 | #### 2.1.2 DTO 数据传输对象 - [x] `JcppUplinkMessage`:上行消息统一封装 - [x] `JcppDownlinkRequest`:下行请求封装 - [x] `JcppDownlinkCommand`:下行命令封装 - [x] `JcppLoginData`:登录数据 - [x] `JcppStartChargeData`:启动充电数据 - [x] `JcppRealTimeData`:实时数据 - [x] `JcppTransactionData`:交易记录数据 - [x] `JcppPricingModel`:计费模板数据 - [x] `JcppRemoteStartResultData`:远程启动结果数据 - [x] `JcppSessionInfo`:会话信息 #### 2.1.3 常量定义 - [x] `JcppConstants`:定义队列名称、Routing Key、消息类型等常量 --- ### 2.2 上行消息处理(JCPP → 万车充)✅ #### 2.2.1 充电桩登录 ✅ **消费者**:`JcppLoginConsumer` **队列**:`jcpp.uplink.login` **功能**: - [x] 接收充电桩登录消息 - [x] 验证充电桩是否存在于系统中 - [x] 发送登录应答(通过 `IJcppDownlinkService.sendLoginAck()`) - [x] 更新充电桩在线状态(已注释,待确认业务逻辑) **状态**:✅ 已完成基础功能 --- #### 2.2.2 心跳消息 ✅ **消费者**:`JcppHeartbeatConsumer` **队列**:`jcpp.uplink.heartbeat` **功能**: - [x] 接收充电桩心跳消息 - [x] 更新充电桩最后活跃时间到 Redis(避免频繁写数据库) - [x] 设置 Redis 过期时间为 180 秒 **状态**:✅ 已完成 --- #### 2.2.3 刷卡启动充电 ✅ **消费者**:`JcppStartChargeConsumer` **队列**:`jcpp.uplink.startCharge` **功能**: - [x] 接收刷卡启动充电请求 - [x] 验证充电桩状态 - [x] 查询授权卡信息(`PileAuthCard`) - [x] 验证卡状态和密码(密码验证已注释) - [x] 查询会员钱包余额 - [x] 检查白名单(免费充电) - [x] 验证余额是否充足 - [x] 生成交易流水号(`tradeNo`) - [x] 创建充电订单(`OrderBasicInfo`) - [x] 发送鉴权结果应答 **状态**:✅ 已完成核心功能 --- #### 2.2.4 实时充电数据 ✅ **消费者**:`JcppRealTimeDataConsumer` **队列**:`jcpp.uplink.realTimeData` **功能**: - [x] 接收实时充电数据(电压、电流、SOC、充电量、充电金额等) - [x] 将实时数据缓存到 Redis(5 分钟过期) - [x] 根据 `tradeNo` 查询订单 - [x] 更新订单实时数据(充电量、金额、时长) - [x] 更新枪状态为充电中 **状态**:✅ 已完成 --- #### 2.2.5 交易记录(订单结算)✅ **消费者**:`JcppTransactionConsumer` **队列**:`jcpp.uplink.transaction` **功能**: - [x] 接收充电完成的交易记录 - [x] 根据 `tradeNo` 查询订单 - [x] 幂等性检查(避免重复处理) - [x] 更新订单状态为"充电完成" - [x] 更新订单充电数据(开始时间、结束时间、充电量、金额、停止原因) - [x] 更新枪状态为空闲 - [x] 发送交易记录应答 - [ ] 触发结算流程(已注释,待实现) **状态**:✅ 基础功能已完成,⚠️ 结算流程待对接 --- #### 2.2.6 枪状态上报 ✅ **消费者**:`JcppGunStatusConsumer` **队列**:`jcpp.uplink.gunStatus` **功能**: - [x] 接收充电枪状态变化消息 - [x] 状态映射(JCPP 状态 → 系统状态) - `IDLE` → `1`(空闲) - `INSERTED` → `2`(占用未充电) - `CHARGING` → `3`(充电中) - `CHARGE_COMPLETE` → `2`(充电完成未拔枪) - `FAULT` → `255`(故障) - `UNKNOWN` → `0`(离网) - [x] 更新枪状态到数据库 - [x] 记录故障信息(日志) **状态**:✅ 已完成 --- #### 2.2.7 计费模板查询 ✅ **消费者**:`JcppPricingConsumer` **队列**:`jcpp.uplink.pricing` **功能**: - [x] 处理计费模板查询请求(`QUERY_PRICING`) - [x] 根据充电桩编码查询关联的计费模板 - [x] 将计费模板转换为 JCPP 格式(`PricingModelConverter`) - [x] 发送计费模板查询应答 - [x] 处理计费模板校验请求(`VERIFY_PRICING`) - [x] 发送计费模板校验应答 **状态**:✅ 已完成 --- #### 2.2.8 远程操作结果 ✅ **消费者**:`JcppRemoteResultConsumer` **队列**:`jcpp.uplink.remoteResult` **功能**: - [x] 处理远程启动结果(`REMOTE_START_RESULT`) - 启动成功:更新订单状态为"充电中",更新枪状态 - 启动失败:更新订单状态为"已取消",记录失败原因 - [x] 处理远程停止结果(`REMOTE_STOP_RESULT`) - 记录停止结果日志 - 等待交易记录消息进行最终结算 - [ ] 启动失败时触发退款流程(已注释,待实现) **状态**:✅ 基础功能已完成,⚠️ 退款流程待实现 --- #### 2.2.9 会话关闭 ✅ **消费者**:`JcppSessionCloseConsumer` **队列**:`jcpp.uplink.sessionClose` **功能**: - [x] 接收充电桩会话关闭消息 - [x] 更新充电桩离线状态(已注释) - [x] 更新所有枪状态为离线 - [ ] 查询正在充电的订单并标记为异常(待实现) **状态**:✅ 基础功能已完成,⚠️ 异常订单处理待实现 --- ### 2.3 下行消息发送(万车充 → JCPP)✅ #### 2.3.1 下行服务接口 **服务接口**:`IJcppDownlinkService` **实现类**:`JcppDownlinkServiceImpl` **已实现方法**: - [x] `sendLoginAck()`:发送登录应答 - [x] `sendStartChargeAck()`:发送启动充电鉴权应答 - [x] `sendTransactionRecordAck()`:发送交易记录应答 - [x] `sendQueryPricingAck()`:发送计费模板查询应答 - [x] `sendVerifyPricingAck()`:发送计费模板校验应答 - [x] `sendRemoteStart()`:发送远程启动充电命令 - [x] `sendRemoteStop()`:发送远程停止充电命令 **状态**:✅ 已完成核心下行命令 --- ### 2.4 远程充电控制 ✅ #### 2.4.1 远程充电服务 **服务接口**:`IJcppRemoteChargeService` **实现类**:`JcppRemoteChargeServiceImpl` **Controller**:`JcppRemoteChargeController`(路径:`/jcpp/remote`) **功能**: - [x] 远程启动充电(`POST /jcpp/remote/start`) - 验证充电桩和枪状态 - 验证会员余额 - 创建充电订单 - 发送远程启动命令到 JCPP - [x] 远程停止充电(`POST /jcpp/remote/stop`) - 查询订单状态 - 发送远程停止命令到 JCPP **状态**:✅ 已完成 --- ### 2.5 工具类 ✅ #### 2.5.1 计费模板转换器 **类名**:`PricingModelConverter` **功能**: - [x] 将系统计费模板(`PileBillingTemplate`)转换为 JCPP 格式(`JcppPricingModel`) - [x] 支持分时电价和服务费配置 - [x] 单元测试:`PricingModelConverterTest` **状态**:✅ 已完成并测试 --- ## 三、已修复的问题 ### 3.1 消息反序列化错误(第一次修复)✅ **问题描述**: 消费 JCPP 发来的消息时报错: ``` Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`) ``` **原因分析**: JCPP 发送的消息中,`data` 字段是一个 **JSON 字符串**,而不是 JSON 对象。但在 `JcppUplinkMessage` 中定义为 `Object` 类型,导致 Jackson 反序列化时尝试将 JSON 字符串解析为对象,从而失败。 **解决方案**: 1. 修改 `JcppUplinkMessage.data` 字段类型从 `Object` 改为 `String` 2. 修改所有消费者中的数据解析逻辑: - 原代码:`JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));` - 修改后:`JSONObject data = JSON.parseObject(uplinkMessage.getData());` **修复时间**:2025-12-30 **状态**:✅ 已修复 --- ### 3.2 RabbitMQ 消息转换器冲突(第二次修复)✅ **问题描述**: 修复第一个问题后,仍然出现反序列化错误: ``` Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`) at [Source: (String)"{"pileCode":"20231212000010","messageType":"LOGIN","data":"{\n \"messageIdMSB\": ... ``` **原因分析**: 消费者方法签名是 `public void handleLogin(String message)`,但 RabbitMQ 配置了 `Jackson2JsonMessageConverter`。当 RabbitMQ 接收到 JSON 对象消息时: 1. `Jackson2JsonMessageConverter` 尝试将 JSON 对象反序列化为 `String` 类型 2. Jackson 无法将 JSON 对象转换为 String,导致 `MismatchedInputException` **根本原因**: - 消费者期望接收 `String` 类型参数 - RabbitMQ 配置了 `Jackson2JsonMessageConverter`,会自动进行 JSON 反序列化 - 两者冲突导致反序列化失败 **解决方案**: 将所有消费者的方法参数从 `String message` 改为 `JcppUplinkMessage uplinkMessage`,让 Jackson 自动反序列化为对象。 **修改内容**: 1. 修改所有消费者的方法签名: - 原代码:`public void handleXxx(String message)` - 修改后:`public void handleXxx(JcppUplinkMessage uplinkMessage)` 2. 简化消息解析逻辑: - 删除:`JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);` - 直接使用:`uplinkMessage` 参数 3. 优化日志输出: - 原代码:`log.info("收到 JCPP 登录消息: {}", message);` - 修改后:`log.info("收到 JCPP 登录消息: pileCode={}, messageType={}", uplinkMessage.getPileCode(), uplinkMessage.getMessageType());` **修改文件**: - [x] `JcppLoginConsumer.java`:修改方法签名和日志 - [x] `JcppHeartbeatConsumer.java`:修改方法签名和日志 - [x] `JcppStartChargeConsumer.java`:修改方法签名和日志 - [x] `JcppRealTimeDataConsumer.java`:修改方法签名和日志 - [x] `JcppTransactionConsumer.java`:修改方法签名和日志 - [x] `JcppGunStatusConsumer.java`:修改方法签名和日志 - [x] `JcppPricingConsumer.java`:修改方法签名和日志 - [x] `JcppRemoteResultConsumer.java`:修改方法签名和日志 - [x] `JcppSessionCloseConsumer.java`:修改方法签名和日志 **优势**: 1. 代码更简洁:不需要手动解析 JSON 字符串 2. 性能更好:减少一次 JSON 解析操作 3. 类型安全:编译时就能发现类型错误 4. 符合 Spring AMQP 最佳实践:利用消息转换器自动反序列化 **修复时间**:2025-12-30 **状态**:✅ 已修复 --- ## 四、待完成功能 ### 4.1 订单结算流程 ⚠️ **优先级**:高 **描述**: 在 `JcppTransactionConsumer` 中,交易记录处理完成后需要触发订单结算流程,包括: - 计算实际费用(电费 + 服务费) - 扣除会员余额或赠金 - 执行分账(商户、平台、站点等) - 发放积分奖励 - 生成清算账单 **待实现**: ```java // TODO: 触发结算流程 // orderBasicInfoService.realTimeOrderSplit(order.getId()); ``` **相关类**: - `OrderBasicInfoServiceImpl.realTimeOrderSplit()` - `StationSplitConfigService` - `OrderSplitRecordService` - `PointsRewardProducer` --- ### 4.2 远程启动失败退款 ⚠️ **优先级**:中 **描述**: 在 `JcppRemoteResultConsumer` 中,如果远程启动失败且用户已预付费,需要触发退款流程。 **待实现**: ```java // TODO: 如果已预付费,触发退款流程 ``` **相关类**: - `MemberWalletInfoService` - `MemberWalletLogService` - 第三方支付退款接口(微信、支付宝) --- ### 4.3 会话关闭异常订单处理 ⚠️ **优先级**:中 **描述**: 在 `JcppSessionCloseConsumer` 中,当充电桩会话关闭时,需要查询是否有正在充电的订单,如果有则标记为异常结束。 **待实现**: ```java // TODO: 查询是否有正在充电的订单,如果有则标记为异常 // 可以调用 OrderBasicInfoService 查询并处理 ``` **相关类**: - `OrderBasicInfoService` - `OrderAbnormalRecordService` --- ### 4.4 充电桩在线状态管理 ⚠️ **优先级**:低 **描述**: 目前在登录和会话关闭时更新充电桩在线状态的代码已注释,需要确认业务逻辑后启用。 **待确认**: - 是否需要实时更新充电桩在线状态到数据库? - 还是仅通过 Redis 心跳判断在线状态? **相关代码**: - `JcppLoginConsumer.java:56-59`(已注释) - `JcppSessionCloseConsumer.java:59`(已注释) --- ### 4.5 枪故障信息持久化 ⚠️ **优先级**:低 **描述**: 在 `JcppGunStatusConsumer` 中,当接收到枪故障信息时,目前仅记录日志,需要将故障信息保存到数据库或发送告警。 **待实现**: ```java // TODO: 可以将故障信息保存到数据库或发送告警 ``` **相关表**: - `pile_connector_info`(可能需要添加故障信息字段) - 或创建新表 `pile_connector_fault_record` --- ### 4.6 消息服务接口 ⚠️ **优先级**:低 **描述**: `IJcppMessageService` 和 `JcppMessageServiceImpl` 已创建但功能未完善,可能用于消息查询、重发等功能。 **待实现**: - 消息历史查询 - 消息重发机制 - 消息统计分析 --- ## 五、测试情况 ### 5.1 单元测试 - [x] `PricingModelConverterTest`:计费模板转换器测试 - [ ] 其他消费者单元测试(待补充) ### 5.2 集成测试 - [ ] 充电桩登录流程测试 - [ ] 刷卡启动充电流程测试 - [ ] 远程启动充电流程测试 - [ ] 充电过程实时数据测试 - [ ] 充电完成结算流程测试 - [ ] 异常场景测试(网络断开、超时等) --- ## 六、部署配置 ### 6.1 RabbitMQ 配置 **配置文件**:`application-{env}.yml` ```yaml spring: rabbitmq: host: ${RABBITMQ_HOST} port: 5672 username: ${RABBITMQ_USERNAME} password: ${RABBITMQ_PASSWORD} virtual-host: / listener: simple: acknowledge-mode: auto # 自动应答模式 concurrency: 1 max-concurrency: 20 prefetch: 1 retry: enabled: true max-attempts: 3 initial-interval: 1s max-interval: 10s multiplier: 2.0 ``` ### 6.2 JCPP 配置 **配置类**:`JcppConfig` ```java @Configuration public class JcppConfig { // JCPP 平台相关配置 // 如:API 地址、认证信息等 } ``` --- ## 七、注意事项 ### 7.1 消息幂等性 - 所有消费者都需要做幂等性检查,避免重复处理 - 目前 `JcppTransactionConsumer` 已实现幂等性检查 - 其他消费者需要根据业务场景补充 ### 7.2 事务管理 - 涉及数据库操作的消费者需要添加 `@Transactional` 注解 - 目前 `JcppStartChargeConsumer`、`JcppTransactionConsumer`、`JcppRemoteResultConsumer` 已添加 ### 7.3 异常处理 - 所有消费者都需要捕获异常并记录日志 - 避免异常导致消息丢失或重复消费 ### 7.4 性能优化 - 实时数据和心跳消息使用 Redis 缓存,避免频繁写数据库 - 考虑使用批量更新减少数据库压力 --- ## 八、后续计划 ### 8.1 短期计划(1-2 周) 1. 完成订单结算流程对接 2. 实现远程启动失败退款功能 3. 完善会话关闭异常订单处理 4. 补充单元测试和集成测试 ### 8.2 中期计划(1 个月) 1. 优化消息处理性能 2. 完善监控和告警机制 3. 补充故障信息持久化功能 4. 完善消息服务接口 ### 8.3 长期计划(3 个月) 1. 支持更多 JCPP 协议功能 2. 实现消息重发和补偿机制 3. 优化数据统计和报表功能 4. 完善运维工具和管理界面 --- ## 九、联系人 - **开发负责人**:待补充 - **测试负责人**:待补充 - **运维负责人**:待补充 --- ## 十、变更记录 | 日期 | 版本 | 修改人 | 修改内容 | |------|------|--------|----------| | 2025-12-30 | v1.0 | Claude | 创建文档,记录当前对接进度 | | 2025-12-30 | v1.1 | Claude | 修复消息反序列化错误(第一次) | | 2025-12-30 | v1.2 | Claude | 修复 RabbitMQ 消息转换器冲突(第二次) | | 2025-12-30 | v2.1 | Claude | 实现 JSON 消息分区消费方案 | --- ## 十一、JSON 消息分区消费方案(v2.1 新增) ### 11.1 方案概述 为了确保同一充电桩的消息按顺序处理,实现了基于 JSON 格式的分区消费方案: - **消息格式**:保持原有 JSON 格式,无需 Protobuf - **分区策略**:基于 pileCode 的 Hash 分区 - **分区数量**:10 个分区(可配置) - **Hash 算法**:MurmurHash3_128(与 JCPP 保持一致) - **消费模式**:每个分区单线程消费,保证顺序 ### 11.2 核心组件 #### 11.2.1 分区计算器 **类名**:`JcppPartitionCalculator` **功能**: - 使用 MurmurHash3_128 算法计算分区 - 根据 pileCode 计算分区编号(0 到 partitionCount-1) - 提供队列名称生成方法 **核心代码**: ```java public static int getPartition(String pileCode) { long hash = Hashing.murmur3_128() .hashString(pileCode, StandardCharsets.UTF_8) .asLong(); return Math.abs((int) (hash % partitionCount)); } ``` #### 11.2.2 分区队列配置 **类名**:`JcppPartitionQueueConfig` **功能**: - 创建 10 个分区队列(`jcpp.uplink.partition.0` ~ `jcpp.uplink.partition.9`) - 创建 Topic Exchange(`jcpp.uplink.exchange`) - 绑定队列到 Exchange(routing key: `jcpp.uplink.#`) #### 11.2.3 JSON 消息消费者 **类名**:`JcppJsonPartitionConsumer` **功能**: - 监听所有分区队列 - 单线程消费(`concurrency=1`)保证顺序 - 自动反序列化 JSON 消息为 `JcppUplinkMessage` 对象 - 验证分区是否正确 - 调用消息处理器处理消息 #### 11.2.4 消息处理器 **接口**:`IJcppJsonMessageHandler` **实现类**:`JcppJsonMessageHandlerImpl` **功能**: - 根据消息类型分发处理 - 整合原有所有消费者的处理逻辑 - 处理登录、心跳、枪状态、充电进度、交易记录、刷卡启动、计费查询、会话关闭、远程操作结果等消息 - 复用原有业务逻辑(`PileBasicInfoService`、`OrderBasicInfoService` 等) ### 11.3 技术栈 - **Guava**:33.0.0-jre(提供 MurmurHash3_128) - **Spring AMQP**:2.5.14 - **RabbitMQ**:5672 - **FastJSON2**:2.0.23 ### 11.4 配置说明 **配置文件**:`application-{env}.yml` ```yaml # JCPP 配置 jcpp: rabbitmq: # 分区数量(与 JCPP 保持一致) partition-count: 10 # Exchange 名称 exchange: jcpp.uplink.exchange # 队列前缀 queue-prefix: jcpp.uplink.partition ``` ### 11.5 消息格式 ```json { "pileCode": "20231212000010", "messageType": "LOGIN", "data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}" } ``` **字段说明**: - `pileCode`:充电桩编码(用于分区计算) - `messageType`:消息类型 - `data`:具体消息内容(JSON 字符串) ### 11.6 优势 1. **消息顺序保证**:同一充电桩的消息按顺序处理 2. **简单易用**:保持原有 JSON 格式,无需 Protobuf 3. **统一处理**:整合所有消息类型的处理逻辑到一个处理器 4. **易于调试**:JSON 格式可读性强,便于排查问题 5. **可扩展性**:支持动态调整分区数量 ### 11.7 与原有方案的对比 | 特性 | 原有方案(无分区) | 新方案(分区消费) | |------|------------------|------------------| | 消息格式 | JSON 字符串 | JSON 字符串 | | 消息顺序 | 不保证 | 保证同一 pileCode 顺序 | | 分区消费 | 不支持 | 支持 | | 并发处理 | 多消费者并发 | 每个分区单线程 | | 代码复杂度 | 多个独立消费者 | 统一消息处理器 | | 易于调试 | 较难 | 较易 | ### 11.8 部署步骤 1. **更新配置**:在 `application-{env}.yml` 中添加 JCPP 配置 2. **启动应用**:应用会自动创建 10 个分区队列 3. **JCPP 端配置**:确保 JCPP 使用相同的 Hash 算法和分区数量 4. **验证测试**:发送测试消息,验证分区路由和消息处理 ### 11.9 注意事项 1. **分区数量**:必须与 JCPP 保持一致,建议在系统初始化时确定,后续不要修改 2. **Hash 算法**:必须使用 MurmurHash3_128,与 JCPP 保持一致 3. **消息顺序**:每个分区单线程消费(`concurrency=1`),不要修改此配置 4. **异常处理**:JSON 解析失败会抛出异常,建议配置死信队列 5. **性能监控**:监控队列堆积、消费延迟、错误率等指标 ### 11.10 相关文档 详细使用说明请参考:[JCPP JSON 消息分区消费方案](./JCPP_JSON消息分区消费方案.md) --- ## 十、变更记录