Files
jsowell-charger-web/doc/JCPP对接进度文档.md
2025-12-30 15:59:34 +08:00

22 KiB
Raw Permalink Blame History

JCPP 充电桩平台对接进度文档

文档创建时间2025-12-30 最后更新时间2025-12-30 对接平台JCPPJava Charging Pile Platform


一、项目概述

1.1 对接目标

将万车充运营管理平台与 JCPP 充电桩平台进行对接,实现充电桩设备的统一管理和充电业务的互联互通。

1.2 技术架构

  • 通信方式RabbitMQ 消息队列
  • 消息格式JSON
  • 协议版本yunkuaichongV150
  • Exchangejcpp.uplink.exchange(上行消息)、jcpp.downlink.exchange(下行消息)

1.3 模块位置

  • 核心代码jsowell-pile 模块
  • 包路径com.jsowell.pile.jcpp
  • 配置类JcppConfigJcppRabbitConfig

二、已完成功能

2.1 基础架构搭建

2.1.1 RabbitMQ 配置

  • 创建 JcppRabbitConfig 配置类
  • 定义上行消息 Exchangejcpp.uplink.exchange
  • 定义 9 个消息队列和对应的 Routing Key
  • 配置队列绑定关系

队列列表

队列名称 Routing Key 用途
jcpp.uplink.login jcpp.uplink.login 充电桩登录
jcpp.uplink.heartbeat jcpp.uplink.heartbeat 心跳消息
jcpp.uplink.startCharge jcpp.uplink.startCharge 刷卡启动充电
jcpp.uplink.realTimeData jcpp.uplink.realTimeData 实时充电数据
jcpp.uplink.transaction jcpp.uplink.transaction 交易记录
jcpp.uplink.gunStatus jcpp.uplink.gunStatus 枪状态上报
jcpp.uplink.pricing jcpp.uplink.pricing.# 计费模板查询
jcpp.uplink.remoteResult jcpp.uplink.remoteResult.# 远程操作结果
jcpp.uplink.sessionClose jcpp.uplink.sessionClose 会话关闭

2.1.2 DTO 数据传输对象

  • JcppUplinkMessage:上行消息统一封装
  • JcppDownlinkRequest:下行请求封装
  • JcppDownlinkCommand:下行命令封装
  • JcppLoginData:登录数据
  • JcppStartChargeData:启动充电数据
  • JcppRealTimeData:实时数据
  • JcppTransactionData:交易记录数据
  • JcppPricingModel:计费模板数据
  • JcppRemoteStartResultData:远程启动结果数据
  • JcppSessionInfo:会话信息

2.1.3 常量定义

  • JcppConstants定义队列名称、Routing Key、消息类型等常量

2.2 上行消息处理JCPP → 万车充)

2.2.1 充电桩登录

消费者JcppLoginConsumer 队列jcpp.uplink.login 功能

  • 接收充电桩登录消息
  • 验证充电桩是否存在于系统中
  • 发送登录应答(通过 IJcppDownlinkService.sendLoginAck()
  • 更新充电桩在线状态(已注释,待确认业务逻辑)

状态 已完成基础功能


2.2.2 心跳消息

消费者JcppHeartbeatConsumer 队列jcpp.uplink.heartbeat 功能

  • 接收充电桩心跳消息
  • 更新充电桩最后活跃时间到 Redis避免频繁写数据库
  • 设置 Redis 过期时间为 180 秒

状态 已完成


2.2.3 刷卡启动充电

消费者JcppStartChargeConsumer 队列jcpp.uplink.startCharge 功能

  • 接收刷卡启动充电请求
  • 验证充电桩状态
  • 查询授权卡信息(PileAuthCard
  • 验证卡状态和密码(密码验证已注释)
  • 查询会员钱包余额
  • 检查白名单(免费充电)
  • 验证余额是否充足
  • 生成交易流水号(tradeNo
  • 创建充电订单(OrderBasicInfo
  • 发送鉴权结果应答

状态 已完成核心功能


2.2.4 实时充电数据

消费者JcppRealTimeDataConsumer 队列jcpp.uplink.realTimeData 功能

  • 接收实时充电数据电压、电流、SOC、充电量、充电金额等
  • 将实时数据缓存到 Redis5 分钟过期)
  • 根据 tradeNo 查询订单
  • 更新订单实时数据(充电量、金额、时长)
  • 更新枪状态为充电中

状态 已完成


2.2.5 交易记录(订单结算)

消费者JcppTransactionConsumer 队列jcpp.uplink.transaction 功能

  • 接收充电完成的交易记录
  • 根据 tradeNo 查询订单
  • 幂等性检查(避免重复处理)
  • 更新订单状态为"充电完成"
  • 更新订单充电数据(开始时间、结束时间、充电量、金额、停止原因)
  • 更新枪状态为空闲
  • 发送交易记录应答
  • 触发结算流程(已注释,待实现)

状态 基础功能已完成,⚠️ 结算流程待对接


2.2.6 枪状态上报

消费者JcppGunStatusConsumer 队列jcpp.uplink.gunStatus 功能

  • 接收充电枪状态变化消息
  • 状态映射JCPP 状态 → 系统状态)
    • IDLE1(空闲)
    • INSERTED2(占用未充电)
    • CHARGING3(充电中)
    • CHARGE_COMPLETE2(充电完成未拔枪)
    • FAULT255(故障)
    • UNKNOWN0(离网)
  • 更新枪状态到数据库
  • 记录故障信息(日志)

状态 已完成


2.2.7 计费模板查询

消费者JcppPricingConsumer 队列jcpp.uplink.pricing 功能

  • 处理计费模板查询请求(QUERY_PRICING
  • 根据充电桩编码查询关联的计费模板
  • 将计费模板转换为 JCPP 格式(PricingModelConverter
  • 发送计费模板查询应答
  • 处理计费模板校验请求(VERIFY_PRICING
  • 发送计费模板校验应答

状态 已完成


2.2.8 远程操作结果

消费者JcppRemoteResultConsumer 队列jcpp.uplink.remoteResult 功能

  • 处理远程启动结果(REMOTE_START_RESULT
    • 启动成功:更新订单状态为"充电中",更新枪状态
    • 启动失败:更新订单状态为"已取消",记录失败原因
  • 处理远程停止结果(REMOTE_STOP_RESULT
    • 记录停止结果日志
    • 等待交易记录消息进行最终结算
  • 启动失败时触发退款流程(已注释,待实现)

状态 基础功能已完成,⚠️ 退款流程待实现


2.2.9 会话关闭

消费者JcppSessionCloseConsumer 队列jcpp.uplink.sessionClose 功能

  • 接收充电桩会话关闭消息
  • 更新充电桩离线状态(已注释)
  • 更新所有枪状态为离线
  • 查询正在充电的订单并标记为异常(待实现)

状态 基础功能已完成,⚠️ 异常订单处理待实现


2.3 下行消息发送(万车充 → JCPP

2.3.1 下行服务接口

服务接口IJcppDownlinkService 实现类JcppDownlinkServiceImpl

已实现方法

  • sendLoginAck():发送登录应答
  • sendStartChargeAck():发送启动充电鉴权应答
  • sendTransactionRecordAck():发送交易记录应答
  • sendQueryPricingAck():发送计费模板查询应答
  • sendVerifyPricingAck():发送计费模板校验应答
  • sendRemoteStart():发送远程启动充电命令
  • sendRemoteStop():发送远程停止充电命令

状态 已完成核心下行命令


2.4 远程充电控制

2.4.1 远程充电服务

服务接口IJcppRemoteChargeService 实现类JcppRemoteChargeServiceImpl ControllerJcppRemoteChargeController(路径:/jcpp/remote

功能

  • 远程启动充电(POST /jcpp/remote/start
    • 验证充电桩和枪状态
    • 验证会员余额
    • 创建充电订单
    • 发送远程启动命令到 JCPP
  • 远程停止充电(POST /jcpp/remote/stop
    • 查询订单状态
    • 发送远程停止命令到 JCPP

状态 已完成


2.5 工具类

2.5.1 计费模板转换器

类名PricingModelConverter 功能

  • 将系统计费模板(PileBillingTemplate)转换为 JCPP 格式(JcppPricingModel
  • 支持分时电价和服务费配置
  • 单元测试:PricingModelConverterTest

状态 已完成并测试


三、已修复的问题

3.1 消息反序列化错误(第一次修复)

问题描述 消费 JCPP 发来的消息时报错:

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)

原因分析 JCPP 发送的消息中,data 字段是一个 JSON 字符串,而不是 JSON 对象。但在 JcppUplinkMessage 中定义为 Object 类型,导致 Jackson 反序列化时尝试将 JSON 字符串解析为对象,从而失败。

解决方案

  1. 修改 JcppUplinkMessage.data 字段类型从 Object 改为 String
  2. 修改所有消费者中的数据解析逻辑:
    • 原代码:JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
    • 修改后:JSONObject data = JSON.parseObject(uplinkMessage.getData());

修复时间2025-12-30 状态 已修复


3.2 RabbitMQ 消息转换器冲突(第二次修复)

问题描述 修复第一个问题后,仍然出现反序列化错误:

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)
 at [Source: (String)"{"pileCode":"20231212000010","messageType":"LOGIN","data":"{\n  \"messageIdMSB\": ...

原因分析 消费者方法签名是 public void handleLogin(String message),但 RabbitMQ 配置了 Jackson2JsonMessageConverter。当 RabbitMQ 接收到 JSON 对象消息时:

  1. Jackson2JsonMessageConverter 尝试将 JSON 对象反序列化为 String 类型
  2. Jackson 无法将 JSON 对象转换为 String导致 MismatchedInputException

根本原因

  • 消费者期望接收 String 类型参数
  • RabbitMQ 配置了 Jackson2JsonMessageConverter,会自动进行 JSON 反序列化
  • 两者冲突导致反序列化失败

解决方案 将所有消费者的方法参数从 String message 改为 JcppUplinkMessage uplinkMessage,让 Jackson 自动反序列化为对象。

修改内容

  1. 修改所有消费者的方法签名:

    • 原代码:public void handleXxx(String message)
    • 修改后:public void handleXxx(JcppUplinkMessage uplinkMessage)
  2. 简化消息解析逻辑:

    • 删除:JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
    • 直接使用:uplinkMessage 参数
  3. 优化日志输出:

    • 原代码:log.info("收到 JCPP 登录消息: {}", message);
    • 修改后:log.info("收到 JCPP 登录消息: pileCode={}, messageType={}", uplinkMessage.getPileCode(), uplinkMessage.getMessageType());

修改文件

  • JcppLoginConsumer.java:修改方法签名和日志
  • JcppHeartbeatConsumer.java:修改方法签名和日志
  • JcppStartChargeConsumer.java:修改方法签名和日志
  • JcppRealTimeDataConsumer.java:修改方法签名和日志
  • JcppTransactionConsumer.java:修改方法签名和日志
  • JcppGunStatusConsumer.java:修改方法签名和日志
  • JcppPricingConsumer.java:修改方法签名和日志
  • JcppRemoteResultConsumer.java:修改方法签名和日志
  • JcppSessionCloseConsumer.java:修改方法签名和日志

优势

  1. 代码更简洁:不需要手动解析 JSON 字符串
  2. 性能更好:减少一次 JSON 解析操作
  3. 类型安全:编译时就能发现类型错误
  4. 符合 Spring AMQP 最佳实践:利用消息转换器自动反序列化

修复时间2025-12-30 状态 已修复


四、待完成功能

4.1 订单结算流程 ⚠️

优先级:高 描述JcppTransactionConsumer 中,交易记录处理完成后需要触发订单结算流程,包括:

  • 计算实际费用(电费 + 服务费)
  • 扣除会员余额或赠金
  • 执行分账(商户、平台、站点等)
  • 发放积分奖励
  • 生成清算账单

待实现

// TODO: 触发结算流程
// orderBasicInfoService.realTimeOrderSplit(order.getId());

相关类

  • OrderBasicInfoServiceImpl.realTimeOrderSplit()
  • StationSplitConfigService
  • OrderSplitRecordService
  • PointsRewardProducer

4.2 远程启动失败退款 ⚠️

优先级:中 描述JcppRemoteResultConsumer 中,如果远程启动失败且用户已预付费,需要触发退款流程。

待实现

// TODO: 如果已预付费,触发退款流程

相关类

  • MemberWalletInfoService
  • MemberWalletLogService
  • 第三方支付退款接口(微信、支付宝)

4.3 会话关闭异常订单处理 ⚠️

优先级:中 描述JcppSessionCloseConsumer 中,当充电桩会话关闭时,需要查询是否有正在充电的订单,如果有则标记为异常结束。

待实现

// TODO: 查询是否有正在充电的订单,如果有则标记为异常
// 可以调用 OrderBasicInfoService 查询并处理

相关类

  • OrderBasicInfoService
  • OrderAbnormalRecordService

4.4 充电桩在线状态管理 ⚠️

优先级:低 描述 目前在登录和会话关闭时更新充电桩在线状态的代码已注释,需要确认业务逻辑后启用。

待确认

  • 是否需要实时更新充电桩在线状态到数据库?
  • 还是仅通过 Redis 心跳判断在线状态?

相关代码

  • JcppLoginConsumer.java:56-59(已注释)
  • JcppSessionCloseConsumer.java:59(已注释)

4.5 枪故障信息持久化 ⚠️

优先级:低 描述JcppGunStatusConsumer 中,当接收到枪故障信息时,目前仅记录日志,需要将故障信息保存到数据库或发送告警。

待实现

// TODO: 可以将故障信息保存到数据库或发送告警

相关表

  • pile_connector_info(可能需要添加故障信息字段)
  • 或创建新表 pile_connector_fault_record

4.6 消息服务接口 ⚠️

优先级:低 描述 IJcppMessageServiceJcppMessageServiceImpl 已创建但功能未完善,可能用于消息查询、重发等功能。

待实现

  • 消息历史查询
  • 消息重发机制
  • 消息统计分析

五、测试情况

5.1 单元测试

  • PricingModelConverterTest:计费模板转换器测试
  • 其他消费者单元测试(待补充)

5.2 集成测试

  • 充电桩登录流程测试
  • 刷卡启动充电流程测试
  • 远程启动充电流程测试
  • 充电过程实时数据测试
  • 充电完成结算流程测试
  • 异常场景测试(网络断开、超时等)

六、部署配置

6.1 RabbitMQ 配置

配置文件application-{env}.yml

spring:
  rabbitmq:
    host: ${RABBITMQ_HOST}
    port: 5672
    username: ${RABBITMQ_USERNAME}
    password: ${RABBITMQ_PASSWORD}
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: auto  # 自动应答模式
        concurrency: 1
        max-concurrency: 20
        prefetch: 1
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1s
          max-interval: 10s
          multiplier: 2.0

6.2 JCPP 配置

配置类JcppConfig

@Configuration
public class JcppConfig {
    // JCPP 平台相关配置
    // 如API 地址、认证信息等
}

七、注意事项

7.1 消息幂等性

  • 所有消费者都需要做幂等性检查,避免重复处理
  • 目前 JcppTransactionConsumer 已实现幂等性检查
  • 其他消费者需要根据业务场景补充

7.2 事务管理

  • 涉及数据库操作的消费者需要添加 @Transactional 注解
  • 目前 JcppStartChargeConsumerJcppTransactionConsumerJcppRemoteResultConsumer 已添加

7.3 异常处理

  • 所有消费者都需要捕获异常并记录日志
  • 避免异常导致消息丢失或重复消费

7.4 性能优化

  • 实时数据和心跳消息使用 Redis 缓存,避免频繁写数据库
  • 考虑使用批量更新减少数据库压力

八、后续计划

8.1 短期计划1-2 周)

  1. 完成订单结算流程对接
  2. 实现远程启动失败退款功能
  3. 完善会话关闭异常订单处理
  4. 补充单元测试和集成测试

8.2 中期计划1 个月)

  1. 优化消息处理性能
  2. 完善监控和告警机制
  3. 补充故障信息持久化功能
  4. 完善消息服务接口

8.3 长期计划3 个月)

  1. 支持更多 JCPP 协议功能
  2. 实现消息重发和补偿机制
  3. 优化数据统计和报表功能
  4. 完善运维工具和管理界面

九、联系人

  • 开发负责人:待补充
  • 测试负责人:待补充
  • 运维负责人:待补充

十、变更记录

日期 版本 修改人 修改内容
2025-12-30 v1.0 Claude 创建文档,记录当前对接进度
2025-12-30 v1.1 Claude 修复消息反序列化错误(第一次)
2025-12-30 v1.2 Claude 修复 RabbitMQ 消息转换器冲突(第二次)
2025-12-30 v2.1 Claude 实现 JSON 消息分区消费方案

十一、JSON 消息分区消费方案v2.1 新增)

11.1 方案概述

为了确保同一充电桩的消息按顺序处理,实现了基于 JSON 格式的分区消费方案:

  • 消息格式:保持原有 JSON 格式,无需 Protobuf
  • 分区策略:基于 pileCode 的 Hash 分区
  • 分区数量10 个分区(可配置)
  • Hash 算法MurmurHash3_128与 JCPP 保持一致)
  • 消费模式:每个分区单线程消费,保证顺序

11.2 核心组件

11.2.1 分区计算器

类名JcppPartitionCalculator 功能

  • 使用 MurmurHash3_128 算法计算分区
  • 根据 pileCode 计算分区编号0 到 partitionCount-1
  • 提供队列名称生成方法

核心代码

public static int getPartition(String pileCode) {
    long hash = Hashing.murmur3_128()
            .hashString(pileCode, StandardCharsets.UTF_8)
            .asLong();
    return Math.abs((int) (hash % partitionCount));
}

11.2.2 分区队列配置

类名JcppPartitionQueueConfig 功能

  • 创建 10 个分区队列(jcpp.uplink.partition.0 ~ jcpp.uplink.partition.9
  • 创建 Topic Exchangejcpp.uplink.exchange
  • 绑定队列到 Exchangerouting key: jcpp.uplink.#

11.2.3 JSON 消息消费者

类名JcppJsonPartitionConsumer 功能

  • 监听所有分区队列
  • 单线程消费(concurrency=1)保证顺序
  • 自动反序列化 JSON 消息为 JcppUplinkMessage 对象
  • 验证分区是否正确
  • 调用消息处理器处理消息

11.2.4 消息处理器

接口IJcppJsonMessageHandler 实现类JcppJsonMessageHandlerImpl 功能

  • 根据消息类型分发处理
  • 整合原有所有消费者的处理逻辑
  • 处理登录、心跳、枪状态、充电进度、交易记录、刷卡启动、计费查询、会话关闭、远程操作结果等消息
  • 复用原有业务逻辑(PileBasicInfoServiceOrderBasicInfoService 等)

11.3 技术栈

  • Guava33.0.0-jre提供 MurmurHash3_128
  • Spring AMQP2.5.14
  • RabbitMQ5672
  • FastJSON22.0.23

11.4 配置说明

配置文件application-{env}.yml

# JCPP 配置
jcpp:
  rabbitmq:
    # 分区数量(与 JCPP 保持一致)
    partition-count: 10
    # Exchange 名称
    exchange: jcpp.uplink.exchange
    # 队列前缀
    queue-prefix: jcpp.uplink.partition

11.5 消息格式

{
  "pileCode": "20231212000010",
  "messageType": "LOGIN",
  "data": "{\"pileCode\":\"20231212000010\",\"remoteAddress\":\"192.168.1.100\"}"
}

字段说明

  • pileCode:充电桩编码(用于分区计算)
  • messageType:消息类型
  • data具体消息内容JSON 字符串)

11.6 优势

  1. 消息顺序保证:同一充电桩的消息按顺序处理
  2. 简单易用:保持原有 JSON 格式,无需 Protobuf
  3. 统一处理:整合所有消息类型的处理逻辑到一个处理器
  4. 易于调试JSON 格式可读性强,便于排查问题
  5. 可扩展性:支持动态调整分区数量

11.7 与原有方案的对比

特性 原有方案(无分区) 新方案(分区消费)
消息格式 JSON 字符串 JSON 字符串
消息顺序 不保证 保证同一 pileCode 顺序
分区消费 不支持 支持
并发处理 多消费者并发 每个分区单线程
代码复杂度 多个独立消费者 统一消息处理器
易于调试 较难 较易

11.8 部署步骤

  1. 更新配置:在 application-{env}.yml 中添加 JCPP 配置
  2. 启动应用:应用会自动创建 10 个分区队列
  3. JCPP 端配置:确保 JCPP 使用相同的 Hash 算法和分区数量
  4. 验证测试:发送测试消息,验证分区路由和消息处理

11.9 注意事项

  1. 分区数量:必须与 JCPP 保持一致,建议在系统初始化时确定,后续不要修改
  2. Hash 算法:必须使用 MurmurHash3_128与 JCPP 保持一致
  3. 消息顺序:每个分区单线程消费(concurrency=1),不要修改此配置
  4. 异常处理JSON 解析失败会抛出异常,建议配置死信队列
  5. 性能监控:监控队列堆积、消费延迟、错误率等指标

11.10 相关文档

详细使用说明请参考:JCPP JSON 消息分区消费方案


十、变更记录