同步充电桩数据

This commit is contained in:
Guoqs
2025-12-30 15:59:34 +08:00
parent 3f42441869
commit ee7a3425d0
38 changed files with 4663 additions and 131 deletions

View File

@@ -0,0 +1,93 @@
package com.jsowell.pile.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.jsowell.common.annotation.Excel;
import com.jsowell.common.core.domain.BaseEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* JCPP 充电桩同步记录对象 jcpp_sync_record
*
* @author jsowell
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class JcppSyncRecord extends BaseEntity {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
private Long id;
/**
* 同步类型FULL-全量INCREMENTAL-增量)
*/
@Excel(name = "同步类型")
private String syncType;
/**
* 同步状态RUNNING-进行中SUCCESS-成功FAILED-失败)
*/
@Excel(name = "同步状态")
private String syncStatus;
/**
* 开始时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Excel(name = "开始时间", width = 30, dateFormat = "yyyy-MM-dd HH:mm:ss")
private Date startTime;
/**
* 结束时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Excel(name = "结束时间", width = 30, dateFormat = "yyyy-MM-dd HH:mm:ss")
private Date endTime;
/**
* 总充电桩数
*/
@Excel(name = "总充电桩数")
private Integer totalPiles;
/**
* 成功充电桩数
*/
@Excel(name = "成功充电桩数")
private Integer successPiles;
/**
* 失败充电桩数
*/
@Excel(name = "失败充电桩数")
private Integer failedPiles;
/**
* 总充电枪数
*/
@Excel(name = "总充电枪数")
private Integer totalGuns;
/**
* 成功充电枪数
*/
@Excel(name = "成功充电枪数")
private Integer successGuns;
/**
* 失败充电枪数
*/
@Excel(name = "失败充电枪数")
private Integer failedGuns;
/**
* 错误信息
*/
private String errorMessage;
}

View File

@@ -0,0 +1,82 @@
package com.jsowell.pile.jcpp.config;
import com.jsowell.pile.jcpp.constant.JcppConstants;
import com.jsowell.pile.jcpp.util.JcppPartitionCalculator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
/**
* JCPP Protobuf 消息分区队列配置
* 实现基于 messageKey 的分区消费,确保同一充电桩的消息顺序处理
*
* @author jsowell
*/
@Slf4j
@Configuration
public class JcppPartitionQueueConfig {
@Value("${jcpp.rabbitmq.partition-count:10}")
private int partitionCount;
@PostConstruct
public void init() {
// 设置分区数量到计算器
JcppPartitionCalculator.setPartitionCount(partitionCount);
log.info("JCPP 分区队列配置初始化完成,分区数量: {}", partitionCount);
}
/**
* 创建分区队列数组
* 每个分区一个队列,用于顺序消费
*/
@Bean
public Queue[] jcppPartitionQueues() {
Queue[] queues = new Queue[partitionCount];
for (int i = 0; i < partitionCount; i++) {
String queueName = JcppPartitionCalculator.getQueueName(i);
queues[i] = new Queue(queueName, true, false, false);
log.info("创建 JCPP 分区队列: {}", queueName);
}
return queues;
}
/**
* 绑定分区队列到 Exchange
* 每个分区队列绑定所有消息类型jcpp.uplink.#
* 实际分区由 JCPP 在发送消息时通过 header 指定
*
* 注意:复用 JcppRabbitConfig 中定义的 jcppUplinkExchange Bean
*/
@Bean
public Binding[] jcppPartitionBindings(
TopicExchange jcppUplinkExchange,
Queue[] jcppPartitionQueues) {
List<Binding> bindings = new ArrayList<>();
for (int i = 0; i < partitionCount; i++) {
// 每个分区队列绑定所有消息类型
Binding binding = BindingBuilder
.bind(jcppPartitionQueues[i])
.to(jcppUplinkExchange)
.with("jcpp.uplink.#");
bindings.add(binding);
log.info("绑定分区队列 {} 到 Exchange: {}",
jcppPartitionQueues[i].getName(),
jcppUplinkExchange.getName());
}
return bindings.toArray(new Binding[0]);
}
}

View File

@@ -50,26 +50,31 @@ public class JcppGunStatusConsumer {
}
@RabbitListener(queues = JcppConstants.QUEUE_GUN_STATUS)
public void handleGunStatus(String message) {
public void handleGunStatus(JcppUplinkMessage uplinkMessage) {
try {
log.info("收到 JCPP 枪状态消息: {}", message);
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.warn("枪状态消息格式错误: {}", message);
log.warn("枪状态消息格式错误");
return;
}
// 从 data 中获取信息
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
String pileCode = data.getString("pileCode");
// 从 uplinkMessage 中获取 pileCode
String pileCode = uplinkMessage.getPileCode();
if (pileCode == null || pileCode.isEmpty()) {
log.warn("枪状态消息缺少 pileCode");
return;
}
log.info("收到 JCPP 枪状态消息: pileCode={}, messageType={}", pileCode, uplinkMessage.getMessageType());
// 从 data 中获取其他信息
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String gunNo = data.getString("gunNo");
String gunRunStatus = data.getString("gunRunStatus");
JSONArray faultMessages = data.getJSONArray("faultMessages");
if (pileCode == null || gunNo == null) {
log.warn("枪状态消息缺少必要字段: {}", message);
if (gunNo == null) {
log.warn("枪状态消息缺少 gunNo");
return;
}
@@ -94,7 +99,8 @@ public class JcppGunStatusConsumer {
}
} catch (Exception e) {
log.error("处理 JCPP 枪状态消息异常: message={}", message, e);
log.error("处理 JCPP 枪状态消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}
}

View File

@@ -28,20 +28,18 @@ public class JcppHeartbeatConsumer {
private StringRedisTemplate stringRedisTemplate;
@RabbitListener(queues = JcppConstants.QUEUE_HEARTBEAT)
public void handleHeartbeat(String message) {
public void handleHeartbeat(JcppUplinkMessage uplinkMessage) {
try {
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.debug("心跳消息格式错误: {}", message);
log.debug("心跳消息格式错误");
return;
}
// 从 data 中获取 pileCode
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
String pileCode = data.getString("pileCode");
// 从 uplinkMessage 中获取 pileCode
String pileCode = uplinkMessage.getPileCode();
if (pileCode == null || pileCode.isEmpty()) {
log.debug("心跳消息缺少 pileCode: {}", message);
log.debug("心跳消息缺少 pileCode");
return;
}
@@ -53,7 +51,8 @@ public class JcppHeartbeatConsumer {
log.debug("收到充电桩心跳: pileCode={}", pileCode);
} catch (Exception e) {
log.error("处理 JCPP 心跳消息异常: message={}", message, e);
log.error("处理 JCPP 心跳消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}
}

View File

@@ -0,0 +1,90 @@
package com.jsowell.pile.jcpp.consumer;
import com.alibaba.fastjson2.JSON;
import com.jsowell.pile.jcpp.dto.JcppUplinkMessage;
import com.jsowell.pile.jcpp.service.IJcppJsonMessageHandler;
import com.jsowell.pile.jcpp.util.JcppPartitionCalculator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* JCPP JSON 消息消费者 - 分区消费
* 每个分区单线程消费,保证同一充电桩的消息顺序处理
*
* 注意:此消费者默认禁用,需要在 JCPP 端配置好分区路由并创建队列后再启用
* 启用方法:取消 @Component 注解的注释
*
* @author jsowell
*/
@Slf4j
// @Component // 暂时禁用,等 JCPP 端配置好分区路由并创建队列后再启用
public class JcppJsonPartitionConsumer {
@Autowired
private IJcppJsonMessageHandler messageHandler;
@RabbitListener(
queues = {
"jcpp.uplink.partition.0",
"jcpp.uplink.partition.1",
"jcpp.uplink.partition.2",
"jcpp.uplink.partition.3",
"jcpp.uplink.partition.4",
"jcpp.uplink.partition.5",
"jcpp.uplink.partition.6",
"jcpp.uplink.partition.7",
"jcpp.uplink.partition.8",
"jcpp.uplink.partition.9"
},
concurrency = "1" // 每个队列单线程消费保证顺序
)
public void consumeMessage(JcppUplinkMessage uplinkMessage,
@Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey,
@Header(AmqpHeaders.CONSUMER_QUEUE) String queueName) {
// 从队列名称提取分区编号
int partition = extractPartitionFromQueue(queueName);
try {
String messageKey = uplinkMessage.getPileCode();
String messageType = uplinkMessage.getMessageType();
log.info("[分区{}] 收到消息: pileCode={}, messageType={}, routingKey={}",
partition, messageKey, messageType, routingKey);
// 验证分区是否正确
int expectedPartition = JcppPartitionCalculator.getPartition(messageKey);
if (expectedPartition != partition) {
log.warn("[分区{}] 消息分区不匹配: pileCode={}, 期望分区={}, 实际分区={}",
partition, messageKey, expectedPartition, partition);
}
// 处理消息
messageHandler.handleUplinkMessage(uplinkMessage);
log.debug("[分区{}] 消息处理完成: pileCode={}", partition, messageKey);
} catch (Exception e) {
log.error("[分区{}] 消息处理失败: pileCode={}",
partition, uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
throw new RuntimeException("消息处理失败", e);
}
}
/**
* 从队列名称提取分区编号
* 队列名称格式: jcpp.uplink.partition.{partition}
*/
private int extractPartitionFromQueue(String queueName) {
try {
String[] parts = queueName.split("\\.");
return Integer.parseInt(parts[parts.length - 1]);
} catch (Exception e) {
log.error("无法从队列名称提取分区编号: {}", queueName, e);
return 0;
}
}
}

View File

@@ -28,25 +28,26 @@ public class JcppLoginConsumer {
private IJcppDownlinkService jcppDownlinkService;
@RabbitListener(queues = JcppConstants.QUEUE_LOGIN)
public void handleLogin(String message) {
public void handleLogin(JcppUplinkMessage uplinkMessage) {
try {
log.info("收到 JCPP 登录消息: {}", message);
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.warn("登录消息格式错误: {}", message);
log.warn("登录消息格式错误");
return;
}
// 从 data 中获取 pileCode
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
String pileCode = data.getString("pileCode");
// 从 uplinkMessage 中获取 pileCode
String pileCode = uplinkMessage.getPileCode();
if (pileCode == null || pileCode.isEmpty()) {
log.warn("登录消息缺少 pileCode: {}", message);
log.warn("登录消息缺少 pileCode");
return;
}
log.info("收到 JCPP 登录消息: pileCode={}, messageType={}", pileCode, uplinkMessage.getMessageType());
// 解析 data
JSONObject data = JSON.parseObject(uplinkMessage.getData());
// 查询充电桩是否存在
PileBasicInfo pileInfo = pileBasicInfoService.selectPileBasicInfoBySN(pileCode);
boolean exists = pileInfo != null;
@@ -66,7 +67,8 @@ public class JcppLoginConsumer {
jcppDownlinkService.sendLoginAck(pileCode, exists);
} catch (Exception e) {
log.error("处理 JCPP 登录消息异常: message={}", message, e);
log.error("处理 JCPP 登录消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}
}

View File

@@ -36,26 +36,28 @@ public class JcppPricingConsumer {
private IJcppDownlinkService jcppDownlinkService;
@RabbitListener(queues = JcppConstants.QUEUE_PRICING)
public void handlePricing(String message) {
public void handlePricing(JcppUplinkMessage uplinkMessage) {
try {
log.info("收到 JCPP 计费消息: {}", message);
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.warn("计费消息格式错误: {}", message);
log.warn("计费消息格式错误");
return;
}
// 从 uplinkMessage 中获取 pileCode 和 messageType
String pileCode = uplinkMessage.getPileCode();
String messageType = uplinkMessage.getMessageType();
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
String pileCode = data.getString("pileCode");
if (pileCode == null || pileCode.isEmpty()) {
log.warn("计费消息缺少 pileCode: {}", message);
log.warn("计费消息缺少 pileCode");
return;
}
log.info("收到 JCPP 计费消息: pileCode={}, messageType={}", pileCode, messageType);
// 解析 data
JSONObject data = JSON.parseObject(uplinkMessage.getData());
// 根据消息类型处理
if (JcppConstants.MessageType.QUERY_PRICING.equals(messageType)) {
handleQueryPricing(pileCode);
@@ -67,7 +69,8 @@ public class JcppPricingConsumer {
}
} catch (Exception e) {
log.error("处理 JCPP 计费消息异常: message={}", message, e);
log.error("处理 JCPP 计费消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}

View File

@@ -39,18 +39,23 @@ public class JcppRealTimeDataConsumer {
private StringRedisTemplate stringRedisTemplate;
@RabbitListener(queues = JcppConstants.QUEUE_REAL_TIME_DATA)
public void handleRealTimeData(String message) {
public void handleRealTimeData(JcppUplinkMessage uplinkMessage) {
try {
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.debug("实时数据消息格式错误: {}", message);
log.debug("实时数据消息格式错误");
return;
}
// 从 data 中获取信息
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
String pileCode = data.getString("pileCode");
// 从 uplinkMessage 中获取 pileCode
String pileCode = uplinkMessage.getPileCode();
if (pileCode == null || pileCode.isEmpty()) {
log.debug("实时数据消息缺少 pileCode");
return;
}
// 从 data 中获取其他信息
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String gunNo = data.getString("gunNo");
String tradeNo = data.getString("tradeNo");
String outputVoltage = data.getString("outputVoltage");
@@ -61,7 +66,7 @@ public class JcppRealTimeDataConsumer {
String totalChargingCostYuan = data.getString("totalChargingCostYuan");
if (tradeNo == null || tradeNo.isEmpty()) {
log.debug("实时数据消息缺少 tradeNo: {}", message);
log.debug("实时数据消息缺少 tradeNo");
return;
}
@@ -101,7 +106,8 @@ public class JcppRealTimeDataConsumer {
log.debug("处理实时数据: tradeNo={}, soc={}, energy={}kWh", tradeNo, soc, totalChargingEnergyKWh);
} catch (Exception e) {
log.error("处理 JCPP 实时数据消息异常: message={}", message, e);
log.error("处理 JCPP 实时数据消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}
}

View File

@@ -30,19 +30,19 @@ public class JcppRemoteResultConsumer {
@Transactional(rollbackFor = Exception.class)
@RabbitListener(queues = JcppConstants.QUEUE_REMOTE_RESULT)
public void handleRemoteResult(String message) {
public void handleRemoteResult(JcppUplinkMessage uplinkMessage) {
try {
log.info("收到 JCPP 远程操作结果消息: {}", message);
log.info("收到 JCPP 远程操作结果消息: pileCode={}, messageType={}",
uplinkMessage.getPileCode(), uplinkMessage.getMessageType());
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.warn("远程操作结果消息格式错误: {}", message);
log.warn("远程操作结果消息格式错误");
return;
}
String messageType = uplinkMessage.getMessageType();
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
JSONObject data = JSON.parseObject(uplinkMessage.getData());
// 根据消息类型处理
if (JcppConstants.MessageType.REMOTE_START_RESULT.equals(messageType)) {
@@ -54,7 +54,8 @@ public class JcppRemoteResultConsumer {
}
} catch (Exception e) {
log.error("处理 JCPP 远程操作结果消息异常: message={}", message, e);
log.error("处理 JCPP 远程操作结果消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}
@@ -62,12 +63,14 @@ public class JcppRemoteResultConsumer {
* 处理远程启动结果
*/
private void handleRemoteStartResult(JSONObject data) {
String pileCode = data.getString("pileCode");
String gunNo = data.getString("gunNo");
String tradeNo = data.getString("tradeNo");
Boolean success = data.getBoolean("success");
String failReason = data.getString("failReason");
// 从 data 中获取 pileCode 和 gunNo如果有
String pileCode = data.getString("pileCode");
String gunNo = data.getString("gunNo");
if (tradeNo == null || tradeNo.isEmpty()) {
log.warn("远程启动结果缺少 tradeNo");
return;
@@ -110,11 +113,13 @@ public class JcppRemoteResultConsumer {
* 处理远程停止结果
*/
private void handleRemoteStopResult(JSONObject data) {
String pileCode = data.getString("pileCode");
String gunNo = data.getString("gunNo");
Boolean success = data.getBoolean("success");
String failReason = data.getString("failReason");
// 从 data 中获取 pileCode 和 gunNo如果有
String pileCode = data.getString("pileCode");
String gunNo = data.getString("gunNo");
if (Boolean.TRUE.equals(success)) {
log.info("远程停止成功: pileCode={}, gunNo={}", pileCode, gunNo);
} else {

View File

@@ -28,27 +28,27 @@ public class JcppSessionCloseConsumer {
private PileConnectorInfoService pileConnectorInfoService;
@RabbitListener(queues = JcppConstants.QUEUE_SESSION_CLOSE)
public void handleSessionClose(String message) {
public void handleSessionClose(JcppUplinkMessage uplinkMessage) {
try {
log.info("收到 JCPP 会话关闭消息: {}", message);
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.warn("会话关闭消息格式错误: {}", message);
log.warn("会话关闭消息格式错误");
return;
}
// 从 data 中获取信息
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
String pileCode = data.getString("pileCode");
String reason = data.getString("reason");
// 从 uplinkMessage 中获取 pileCode
String pileCode = uplinkMessage.getPileCode();
if (pileCode == null || pileCode.isEmpty()) {
log.warn("会话关闭消息缺少 pileCode: {}", message);
log.warn("会话关闭消息缺少 pileCode");
return;
}
log.info("收到 JCPP 会话关闭消息: pileCode={}, messageType={}", pileCode, uplinkMessage.getMessageType());
// 从 data 中获取其他信息
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String reason = data.getString("reason");
log.info("充电桩会话关闭: pileCode={}, reason={}", pileCode, reason);
// 1. 更新充电桩离线状态
@@ -70,7 +70,8 @@ public class JcppSessionCloseConsumer {
// 可以调用 OrderBasicInfoService 查询并处理
} catch (Exception e) {
log.error("处理 JCPP 会话关闭消息异常: message={}", message, e);
log.error("处理 JCPP 会话关闭消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}
}

View File

@@ -54,28 +54,33 @@ public class JcppStartChargeConsumer {
@Transactional(rollbackFor = Exception.class)
@RabbitListener(queues = JcppConstants.QUEUE_START_CHARGE)
public void handleStartCharge(String message) {
public void handleStartCharge(JcppUplinkMessage uplinkMessage) {
try {
log.info("收到 JCPP 启动充电消息: {}", message);
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.warn("启动充电消息格式错误: {}", message);
log.warn("启动充电消息格式错误");
return;
}
// 从 data 中获取信息
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
String pileCode = data.getString("pileCode");
// 从 uplinkMessage 中获取 pileCode
String pileCode = uplinkMessage.getPileCode();
if (pileCode == null || pileCode.isEmpty()) {
log.warn("启动充电消息缺少 pileCode");
return;
}
log.info("收到 JCPP 启动充电消息: pileCode={}, messageType={}", pileCode, uplinkMessage.getMessageType());
// 从 data 中获取其他信息
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String gunNo = data.getString("gunNo");
String startType = data.getString("startType");
String cardNo = data.getString("cardNo");
Boolean needPassword = data.getBoolean("needPassword");
String password = data.getString("password");
if (pileCode == null || gunNo == null) {
log.warn("启动充电消息缺少必要字段: {}", message);
if (gunNo == null) {
log.warn("启动充电消息缺少 gunNo");
return;
}
@@ -89,7 +94,8 @@ public class JcppStartChargeConsumer {
}
} catch (Exception e) {
log.error("处理 JCPP 启动充电消息异常: message={}", message, e);
log.error("处理 JCPP 启动充电消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}

View File

@@ -38,20 +38,25 @@ public class JcppTransactionConsumer {
@Transactional(rollbackFor = Exception.class)
@RabbitListener(queues = JcppConstants.QUEUE_TRANSACTION)
public void handleTransaction(String message) {
public void handleTransaction(JcppUplinkMessage uplinkMessage) {
try {
log.info("收到 JCPP 交易记录消息: {}", message);
// 解析消息
JcppUplinkMessage uplinkMessage = JSON.parseObject(message, JcppUplinkMessage.class);
// 验证消息
if (uplinkMessage == null || uplinkMessage.getData() == null) {
log.warn("交易记录消息格式错误: {}", message);
log.warn("交易记录消息格式错误");
return;
}
// 从 data 中获取信息
JSONObject data = JSON.parseObject(JSON.toJSONString(uplinkMessage.getData()));
String pileCode = data.getString("pileCode");
// 从 uplinkMessage 中获取 pileCode
String pileCode = uplinkMessage.getPileCode();
if (pileCode == null || pileCode.isEmpty()) {
log.warn("交易记录消息缺少 pileCode");
return;
}
log.info("收到 JCPP 交易记录消息: pileCode={}, messageType={}", pileCode, uplinkMessage.getMessageType());
// 从 data 中获取其他信息
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String gunNo = data.getString("gunNo");
String tradeNo = data.getString("tradeNo");
Long startTs = data.getLong("startTs");
@@ -62,7 +67,7 @@ public class JcppTransactionConsumer {
JSONObject detail = data.getJSONObject("detail");
if (tradeNo == null || tradeNo.isEmpty()) {
log.warn("交易记录消息缺少 tradeNo: {}", message);
log.warn("交易记录消息缺少 tradeNo");
return;
}
@@ -122,7 +127,8 @@ public class JcppTransactionConsumer {
// orderBasicInfoService.realTimeOrderSplit(order.getId());
} catch (Exception e) {
log.error("处理 JCPP 交易记录消息异常: message={}", message, e);
log.error("处理 JCPP 交易记录消息异常: pileCode={}",
uplinkMessage != null ? uplinkMessage.getPileCode() : "unknown", e);
}
}
}

View File

@@ -52,7 +52,7 @@ public class JcppUplinkMessage implements Serializable {
private Long timestamp;
/**
* 具体消息内容(根据 messageType 不同,结构不同)
* 具体消息内容(JSON 字符串格式,根据 messageType 不同,结构不同)
*/
private Object data;
private String data;
}

View File

@@ -0,0 +1,43 @@
package com.jsowell.pile.jcpp.dto.sync;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import java.io.Serializable;
/**
* JCPP 充电枪同步数据传输对象
*
* @author jsowell
*/
@Data
public class JcppGunSyncDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 充电枪编码(对应 Web 的 pile_connector_code
*/
private String gunCode;
/**
* 充电枪名称(对应 Web 的 name
*/
private String gunName;
/**
* 枪号(从 gunCode 提取最后 2 位)
*/
private String gunNo;
/**
* 所属充电桩编码(对应 Web 的 pile_sn
*/
private String pileCode;
/**
* 附加信息JSON 格式)
* 包含webGunId, status, parkNo 等
*/
private JSONObject additionalInfo;
}

View File

@@ -0,0 +1,58 @@
package com.jsowell.pile.jcpp.dto.sync;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import java.io.Serializable;
/**
* JCPP 充电桩同步数据传输对象
*
* @author jsowell
*/
@Data
public class JcppPileSyncDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 充电桩编码(对应 Web 的 sn
*/
private String pileCode;
/**
* 充电桩名称(对应 Web 的 name
*/
private String pileName;
/**
* 软件协议(对应 Web 的 software_protocol
*/
private String protocol;
/**
* 品牌
*/
private String brand;
/**
* 型号
*/
private String model;
/**
* 制造商
*/
private String manufacturer;
/**
* 类型OPERATION-运营桩, PERSONAL-个人桩
*/
private String type;
/**
* 附加信息JSON 格式)
* 包含webPileId, webStationId, businessType, secretKey, longitude, latitude, iccid 等
*/
private JSONObject additionalInfo;
}

View File

@@ -0,0 +1,38 @@
package com.jsowell.pile.jcpp.dto.sync;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* JCPP 同步请求
*
* @author jsowell
*/
@Data
public class JcppSyncRequest implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 同步类型FULL-全量, INCREMENTAL-增量
*/
private String syncType;
/**
* 上次同步时间(增量同步时使用)
*/
private Date lastSyncTime;
/**
* 充电桩列表
*/
private List<JcppPileSyncDTO> piles;
/**
* 充电枪列表
*/
private List<JcppGunSyncDTO> guns;
}

View File

@@ -0,0 +1,139 @@
package com.jsowell.pile.jcpp.dto.sync;
import lombok.Data;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* JCPP 同步响应
*
* @author jsowell
*/
@Data
public class JcppSyncResponse implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 是否成功
*/
private Boolean success;
/**
* 消息
*/
private String message;
/**
* 总充电桩数
*/
private Integer totalPiles;
/**
* 成功充电桩数
*/
private Integer successPiles;
/**
* 失败充电桩数
*/
private Integer failedPiles;
/**
* 总充电枪数
*/
private Integer totalGuns;
/**
* 成功充电枪数
*/
private Integer successGuns;
/**
* 失败充电枪数
*/
private Integer failedGuns;
/**
* 同步结果列表
*/
private List<JcppSyncResult> results;
/**
* 错误信息列表
*/
private List<String> errors;
/**
* 构建响应
*/
public static JcppSyncResponse build(List<JcppSyncResult> pileResults, List<JcppSyncResult> gunResults) {
JcppSyncResponse response = new JcppSyncResponse();
// 统计充电桩结果
int totalPiles = pileResults != null ? pileResults.size() : 0;
int successPiles = 0;
int failedPiles = 0;
if (pileResults != null) {
for (JcppSyncResult result : pileResults) {
if (Boolean.TRUE.equals(result.getSuccess())) {
successPiles++;
} else {
failedPiles++;
}
}
}
// 统计充电枪结果
int totalGuns = gunResults != null ? gunResults.size() : 0;
int successGuns = 0;
int failedGuns = 0;
if (gunResults != null) {
for (JcppSyncResult result : gunResults) {
if (Boolean.TRUE.equals(result.getSuccess())) {
successGuns++;
} else {
failedGuns++;
}
}
}
// 设置统计信息
response.setTotalPiles(totalPiles);
response.setSuccessPiles(successPiles);
response.setFailedPiles(failedPiles);
response.setTotalGuns(totalGuns);
response.setSuccessGuns(successGuns);
response.setFailedGuns(failedGuns);
// 合并结果
List<JcppSyncResult> allResults = new ArrayList<>();
if (pileResults != null) {
allResults.addAll(pileResults);
}
if (gunResults != null) {
allResults.addAll(gunResults);
}
response.setResults(allResults);
// 收集错误信息
List<String> errors = new ArrayList<>();
for (JcppSyncResult result : allResults) {
if (Boolean.FALSE.equals(result.getSuccess())) {
errors.add(result.getCode() + ": " + result.getMessage());
}
}
response.setErrors(errors);
// 判断整体是否成功
boolean overallSuccess = (failedPiles == 0 && failedGuns == 0);
response.setSuccess(overallSuccess);
response.setMessage(overallSuccess ? "同步成功" : "同步部分失败");
return response;
}
}

View File

@@ -0,0 +1,59 @@
package com.jsowell.pile.jcpp.dto.sync;
import lombok.Data;
import java.io.Serializable;
/**
* JCPP 单个同步结果
*
* @author jsowell
*/
@Data
public class JcppSyncResult implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 充电桩编码或充电枪编码
*/
private String code;
/**
* JCPP 返回的 IDUUID
*/
private String id;
/**
* 是否成功
*/
private Boolean success;
/**
* 消息
*/
private String message;
/**
* 创建成功结果
*/
public static JcppSyncResult success(String code, String id, String message) {
JcppSyncResult result = new JcppSyncResult();
result.setCode(code);
result.setId(id);
result.setSuccess(true);
result.setMessage(message);
return result;
}
/**
* 创建失败结果
*/
public static JcppSyncResult fail(String code, String message) {
JcppSyncResult result = new JcppSyncResult();
result.setCode(code);
result.setSuccess(false);
result.setMessage(message);
return result;
}
}

View File

@@ -0,0 +1,19 @@
package com.jsowell.pile.jcpp.service;
import com.jsowell.pile.jcpp.dto.JcppUplinkMessage;
/**
* JCPP JSON 消息处理器接口
* 处理从 JCPP 接收到的各种上行消息JSON 格式)
*
* @author jsowell
*/
public interface IJcppJsonMessageHandler {
/**
* 处理上行消息
*
* @param message JSON 上行消息
*/
void handleUplinkMessage(JcppUplinkMessage message);
}

View File

@@ -0,0 +1,36 @@
package com.jsowell.pile.jcpp.service;
import com.jsowell.pile.jcpp.dto.sync.JcppSyncResponse;
import java.util.Date;
/**
* JCPP 充电桩同步服务接口
*
* @author jsowell
*/
public interface IJcppPileSyncService {
/**
* 全量同步充电桩数据到 JCPP
*
* @return 同步结果
*/
JcppSyncResponse syncAllPiles();
/**
* 增量同步充电桩数据到 JCPP
*
* @param lastSyncTime 上次同步时间(可选,如果为 null 则查询最后一次成功的同步记录)
* @return 同步结果
*/
JcppSyncResponse syncIncrementalPiles(Date lastSyncTime);
/**
* 同步单个充电桩
*
* @param pileSn 充电桩编号
* @return 是否成功
*/
boolean syncSinglePile(String pileSn);
}

View File

@@ -0,0 +1,645 @@
package com.jsowell.pile.jcpp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.jsowell.common.enums.ykc.StartTypeEnum;
import com.jsowell.common.util.StringUtils;
import com.jsowell.common.util.id.IdUtils;
import com.jsowell.pile.domain.*;
import com.jsowell.pile.jcpp.constant.JcppConstants;
import com.jsowell.pile.jcpp.dto.JcppPricingModel;
import com.jsowell.pile.jcpp.dto.JcppUplinkMessage;
import com.jsowell.pile.jcpp.service.IJcppDownlinkService;
import com.jsowell.pile.jcpp.service.IJcppJsonMessageHandler;
import com.jsowell.pile.jcpp.util.PricingModelConverter;
import com.jsowell.pile.service.*;
import com.jsowell.pile.vo.web.BillingTemplateVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* JCPP JSON 消息处理器实现
* 整合原有各个消费者的处理逻辑,支持分区消费
*
* @author jsowell
*/
@Slf4j
@Service
public class JcppJsonMessageHandlerImpl implements IJcppJsonMessageHandler {
private static final String HEARTBEAT_KEY_PREFIX = "jcpp:heartbeat:";
private static final long HEARTBEAT_EXPIRE_SECONDS = 180L;
private static final String REALTIME_DATA_KEY_PREFIX = "jcpp:realtime:";
private static final long REALTIME_DATA_EXPIRE_SECONDS = 300L;
@Autowired
private PileBasicInfoService pileBasicInfoService;
@Autowired
private PileConnectorInfoService pileConnectorInfoService;
@Autowired
private PileAuthCardService pileAuthCardService;
@Autowired
private MemberBasicInfoService memberBasicInfoService;
@Autowired
private MemberWalletInfoService memberWalletInfoService;
@Autowired
private PileStationWhitelistService pileStationWhitelistService;
@Autowired
private OrderBasicInfoService orderBasicInfoService;
@Autowired
private PileBillingTemplateService pileBillingTemplateService;
@Autowired
private IJcppDownlinkService jcppDownlinkService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public void handleUplinkMessage(JcppUplinkMessage uplinkMessage) {
String messageType = uplinkMessage.getMessageType();
String pileCode = uplinkMessage.getPileCode();
log.debug("处理上行消息: pileCode={}, messageType={}", pileCode, messageType);
// 根据消息类型分发处理
switch (messageType) {
case JcppConstants.MessageType.LOGIN:
handleLogin(uplinkMessage);
break;
case JcppConstants.MessageType.HEARTBEAT:
handleHeartbeat(uplinkMessage);
break;
case JcppConstants.MessageType.GUN_STATUS:
handleGunStatus(uplinkMessage);
break;
case JcppConstants.MessageType.REAL_TIME_DATA:
handleRealTimeData(uplinkMessage);
break;
case JcppConstants.MessageType.TRANSACTION_RECORD:
handleTransactionRecord(uplinkMessage);
break;
case JcppConstants.MessageType.START_CHARGE:
handleStartCharge(uplinkMessage);
break;
case JcppConstants.MessageType.QUERY_PRICING:
handleQueryPricing(uplinkMessage);
break;
case JcppConstants.MessageType.VERIFY_PRICING:
handleVerifyPricing(uplinkMessage);
break;
case JcppConstants.MessageType.SESSION_CLOSE:
handleSessionClose(uplinkMessage);
break;
case JcppConstants.MessageType.REMOTE_START_RESULT:
case JcppConstants.MessageType.REMOTE_STOP_RESULT:
handleRemoteResult(uplinkMessage);
break;
default:
log.warn("未知的消息类型: messageType={}, pileCode={}", messageType, pileCode);
}
}
/**
* 处理登录请求
*/
private void handleLogin(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
JSONObject data = JSON.parseObject(uplinkMessage.getData());
log.info("处理登录请求: pileCode={}", pileCode);
// 查询充电桩是否存在
PileBasicInfo pileInfo = pileBasicInfoService.selectPileBasicInfoBySN(pileCode);
boolean exists = pileInfo != null;
if (exists) {
log.info("充电桩登录成功: pileCode={}", pileCode);
} else {
log.warn("充电桩不存在: pileCode={}", pileCode);
}
// 发送登录应答
jcppDownlinkService.sendLoginAck(pileCode, exists);
}
/**
* 处理心跳请求
*/
private void handleHeartbeat(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
// 更新最后活跃时间到 Redis
String key = HEARTBEAT_KEY_PREFIX + pileCode;
stringRedisTemplate.opsForValue().set(key, String.valueOf(System.currentTimeMillis()),
HEARTBEAT_EXPIRE_SECONDS, TimeUnit.SECONDS);
log.debug("收到充电桩心跳: pileCode={}", pileCode);
}
/**
* 处理枪状态上报
*/
private void handleGunStatus(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String gunNo = data.getString("gunNo");
String gunRunStatus = data.getString("gunRunStatus");
log.info("处理枪状态: pileCode={}, gunNo={}, status={}", pileCode, gunNo, gunRunStatus);
// 映射状态
String systemStatus = mapGunStatus(gunRunStatus);
// 更新枪状态
String pileConnectorCode = pileCode + gunNo;
int result = pileConnectorInfoService.updateConnectorStatus(pileConnectorCode, systemStatus);
if (result > 0) {
log.info("更新枪状态成功: pileConnectorCode={}, status={}", pileConnectorCode, systemStatus);
} else {
log.warn("更新枪状态失败: pileConnectorCode={}", pileConnectorCode);
}
// 记录故障信息
if (data.containsKey("faultMessages") && data.getJSONArray("faultMessages") != null) {
log.warn("充电枪故障: pileConnectorCode={}, faults={}", pileConnectorCode, data.getJSONArray("faultMessages"));
}
}
/**
* 处理充电进度数据
*/
private void handleRealTimeData(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String gunNo = data.getString("gunNo");
String tradeNo = data.getString("tradeNo");
if (tradeNo == null || tradeNo.isEmpty()) {
log.debug("实时数据消息缺少 tradeNo");
return;
}
// 将实时数据缓存到 Redis
String key = REALTIME_DATA_KEY_PREFIX + tradeNo;
stringRedisTemplate.opsForValue().set(key, JSON.toJSONString(data),
REALTIME_DATA_EXPIRE_SECONDS, TimeUnit.SECONDS);
// 根据 tradeNo 查询订单
OrderBasicInfo order = orderBasicInfoService.getOrderInfoByTransactionCode(tradeNo);
if (order != null && "1".equals(order.getOrderStatus())) {
// 更新订单实时数据
OrderBasicInfo updateOrder = new OrderBasicInfo();
updateOrder.setId(order.getId());
String totalChargingCostYuan = data.getString("totalChargingCostYuan");
if (totalChargingCostYuan != null && !totalChargingCostYuan.isEmpty()) {
updateOrder.setOrderAmount(new BigDecimal(totalChargingCostYuan));
}
orderBasicInfoService.updateOrderBasicInfo(updateOrder);
}
// 更新枪状态为充电中
String pileConnectorCode = pileCode + gunNo;
pileConnectorInfoService.updateConnectorStatus(pileConnectorCode, "3");
log.debug("处理充电进度: tradeNo={}, pileCode={}", tradeNo, pileCode);
}
/**
* 处理交易记录
*/
@Transactional(rollbackFor = Exception.class)
private void handleTransactionRecord(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String gunNo = data.getString("gunNo");
String tradeNo = data.getString("tradeNo");
if (tradeNo == null || tradeNo.isEmpty()) {
log.warn("交易记录消息缺少 tradeNo");
return;
}
log.info("处理交易记录: tradeNo={}, pileCode={}, gunNo={}", tradeNo, pileCode, gunNo);
// 根据 tradeNo 查询订单
OrderBasicInfo order = orderBasicInfoService.getOrderInfoByTransactionCode(tradeNo);
if (order == null) {
log.warn("订单不存在: tradeNo={}", tradeNo);
jcppDownlinkService.sendTransactionRecordAck(pileCode, tradeNo, false);
return;
}
// 幂等性检查
if ("2".equals(order.getOrderStatus())) {
log.info("订单已处理,跳过: tradeNo={}", tradeNo);
jcppDownlinkService.sendTransactionRecordAck(pileCode, tradeNo, true);
return;
}
// 更新订单信息
OrderBasicInfo updateOrder = new OrderBasicInfo();
updateOrder.setId(order.getId());
updateOrder.setOrderStatus("2"); // 充电完成
Long startTs = data.getLong("startTs");
Long endTs = data.getLong("endTs");
if (startTs != null && startTs > 0) {
updateOrder.setChargeStartTime(new Date(startTs));
}
if (endTs != null && endTs > 0) {
updateOrder.setChargeEndTime(new Date(endTs));
}
String totalAmountYuan = data.getString("totalAmountYuan");
if (totalAmountYuan != null && !totalAmountYuan.isEmpty()) {
updateOrder.setOrderAmount(new BigDecimal(totalAmountYuan));
}
String stopReason = data.getString("stopReason");
if (stopReason != null && !stopReason.isEmpty()) {
updateOrder.setReason(stopReason);
}
orderBasicInfoService.updateOrderBasicInfo(updateOrder);
log.info("更新订单完成: tradeNo={}", tradeNo);
// 更新枪状态为空闲
String pileConnectorCode = pileCode + gunNo;
pileConnectorInfoService.updateConnectorStatus(pileConnectorCode, "1");
// 发送交易记录应答
jcppDownlinkService.sendTransactionRecordAck(pileCode, tradeNo, true);
// TODO: 触发结算流程
// orderBasicInfoService.realTimeOrderSplit(order.getId());
}
/**
* 处理启动充电请求(刷卡)
*/
@Transactional(rollbackFor = Exception.class)
private void handleStartCharge(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String gunNo = data.getString("gunNo");
String startType = data.getString("startType");
String cardNo = data.getString("cardNo");
if (pileCode == null || gunNo == null) {
log.warn("启动充电消息缺少必要字段");
return;
}
log.info("处理启动充电请求: pileCode={}, gunNo={}, cardNo={}", pileCode, gunNo, cardNo);
// 处理刷卡启动
if ("CARD".equals(startType)) {
handleCardStartCharge(pileCode, gunNo, cardNo);
} else {
log.warn("不支持的启动类型: {}", startType);
jcppDownlinkService.sendStartChargeAck(pileCode, gunNo, null, cardNo, null,
false, "不支持的启动类型");
}
}
/**
* 处理刷卡启动充电
*/
private void handleCardStartCharge(String pileCode, String gunNo, String cardNo) {
String failReason = null;
String tradeNo = null;
String limitYuan = null;
boolean authSuccess = false;
try {
// 1. 查询充电桩信息
PileBasicInfo pileInfo = pileBasicInfoService.selectPileBasicInfoBySN(pileCode);
if (pileInfo == null) {
failReason = "充电桩不存在";
sendStartChargeAck(pileCode, gunNo, tradeNo, cardNo, limitYuan, false, failReason);
return;
}
// 检查充电桩状态
if (pileInfo.getDelFlag() != null && StringUtils.equals(pileInfo.getDelFlag(), "1")) {
failReason = "充电桩已停用";
sendStartChargeAck(pileCode, gunNo, tradeNo, cardNo, limitYuan, false, failReason);
return;
}
// 2. 查询授权卡信息
PileAuthCard authCard = pileAuthCardService.selectCardInfoByLogicCard(cardNo);
if (authCard == null) {
failReason = "账户不存在";
sendStartChargeAck(pileCode, gunNo, tradeNo, cardNo, limitYuan, false, failReason);
return;
}
// 检查卡状态
if (authCard.getStatus() != null && !StringUtils.equals(authCard.getStatus(), "1")) {
failReason = "账户已冻结";
sendStartChargeAck(pileCode, gunNo, tradeNo, cardNo, limitYuan, false, failReason);
return;
}
// 3. 查询会员信息和钱包余额
String memberId = authCard.getMemberId();
BigDecimal balance = BigDecimal.ZERO;
if (memberId != null) {
MemberWalletInfo walletInfo = memberWalletInfoService.selectByMemberId(memberId, String.valueOf(pileInfo.getMerchantId()));
if (walletInfo != null) {
balance = walletInfo.getPrincipalBalance();
if (walletInfo.getGiftBalance() != null) {
balance = balance.add(walletInfo.getGiftBalance());
}
}
}
// 4. 检查白名单
boolean isWhitelist = checkWhitelist(pileInfo.getStationId(), cardNo, memberId);
// 5. 验证余额(非白名单用户需要检查余额)
BigDecimal minAmount = new BigDecimal("1.00");
if (!isWhitelist && balance.compareTo(minAmount) < 0) {
failReason = "余额不足";
sendStartChargeAck(pileCode, gunNo, tradeNo, cardNo, limitYuan, false, failReason);
return;
}
// 6. 生成交易流水号
tradeNo = IdUtils.fastSimpleUUID();
limitYuan = balance.toString();
// 7. 创建充电订单
OrderBasicInfo order = new OrderBasicInfo();
order.setOrderCode(tradeNo);
order.setPileSn(pileCode);
order.setConnectorCode(pileCode + gunNo);
order.setMemberId(memberId);
order.setStationId(String.valueOf(pileInfo.getStationId()));
order.setMerchantId(String.valueOf(pileInfo.getMerchantId()));
order.setOrderStatus("0"); // 待支付/启动中
order.setPayMode(String.valueOf(isWhitelist ? 3 : 1)); // 3-白名单支付, 1-余额支付
order.setCreateTime(new Date());
order.setStartType(StartTypeEnum.NOW.getValue());
orderBasicInfoService.insert(order);
log.info("创建充电订单: tradeNo={}, pileCode={}, gunNo={}", tradeNo, pileCode, gunNo);
authSuccess = true;
} catch (Exception e) {
log.error("处理刷卡启动充电异常: pileCode={}, gunNo={}, cardNo={}", pileCode, gunNo, cardNo, e);
failReason = "系统错误";
}
// 发送鉴权结果
sendStartChargeAck(pileCode, gunNo, tradeNo, cardNo, limitYuan, authSuccess, failReason);
}
/**
* 检查白名单
*/
private boolean checkWhitelist(Long stationId, String cardNo, String memberId) {
try {
if (stationId == null) {
return false;
}
PileStationWhitelist pileStationWhitelist = pileStationWhitelistService.queryWhitelistByMemberId(String.valueOf(stationId), memberId);
return pileStationWhitelist != null;
} catch (Exception e) {
log.error("检查白名单异常: stationId={}", stationId, e);
}
return false;
}
/**
* 发送启动充电应答
*/
private void sendStartChargeAck(String pileCode, String gunNo, String tradeNo, String cardNo,
String limitYuan, boolean authSuccess, String failReason) {
jcppDownlinkService.sendStartChargeAck(pileCode, gunNo, tradeNo, cardNo, limitYuan,
authSuccess, failReason);
if (authSuccess) {
log.info("刷卡鉴权成功: pileCode={}, gunNo={}, cardNo={}, tradeNo={}",
pileCode, gunNo, cardNo, tradeNo);
} else {
log.warn("刷卡鉴权失败: pileCode={}, gunNo={}, cardNo={}, reason={}",
pileCode, gunNo, cardNo, failReason);
}
}
/**
* 处理查询计费模板请求
*/
private void handleQueryPricing(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
log.info("处理查询计费模板请求: pileCode={}", pileCode);
try {
// 根据 pileCode 查询充电桩
PileBasicInfo pileInfo = pileBasicInfoService.selectPileBasicInfoBySN(pileCode);
if (pileInfo == null) {
log.warn("充电桩不存在: pileCode={}", pileCode);
jcppDownlinkService.sendQueryPricingAck(pileCode, null, null);
return;
}
// 获取关联的计费模板
BillingTemplateVO billingTemplateVO = pileBillingTemplateService.selectBillingTemplateDetailByPileSn(pileCode);
if (billingTemplateVO == null || billingTemplateVO.getTemplateId() == null) {
log.warn("充电桩未配置计费模板: pileCode={}", pileCode);
jcppDownlinkService.sendQueryPricingAck(pileCode, null, null);
return;
}
Long billingTemplateId = Long.parseLong(billingTemplateVO.getTemplateId());
// 查询计费模板
PileBillingTemplate template = pileBillingTemplateService.selectPileBillingTemplateById(billingTemplateId);
if (template == null) {
log.warn("计费模板不存在: pileCode={}, billingTemplateId={}", pileCode, billingTemplateId);
jcppDownlinkService.sendQueryPricingAck(pileCode, null, null);
return;
}
// 转换为 JCPP 格式
JcppPricingModel pricingModel = PricingModelConverter.convert(template);
// 发送应答
jcppDownlinkService.sendQueryPricingAck(pileCode, billingTemplateId, pricingModel);
log.info("发送计费模板查询应答: pileCode={}, pricingId={}", pileCode, billingTemplateId);
} catch (Exception e) {
log.error("处理查询计费模板异常: pileCode=", pileCode, e);
jcppDownlinkService.sendQueryPricingAck(pileCode, null, null);
}
}
/**
* 处理校验计费模板请求
*/
private void handleVerifyPricing(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
JSONObject data = JSON.parseObject(uplinkMessage.getData());
Long pricingId = data.getLong("pricingId");
log.info("处理校验计费模板请求: pileCode={}, pricingId={}", pileCode, pricingId);
try {
boolean success = false;
if (pricingId != null) {
PileBillingTemplate template = pileBillingTemplateService.selectPileBillingTemplateById(pricingId);
success = template != null;
}
// 发送应答
jcppDownlinkService.sendVerifyPricingAck(pileCode, success, pricingId);
log.info("发送计费模板校验应答: pileCode={}, pricingId={}, success={}", pileCode, pricingId, success);
} catch (Exception e) {
log.error("处理校验计费模板异常: pileCode={}, pricingId={}", pileCode, pricingId, e);
jcppDownlinkService.sendVerifyPricingAck(pileCode, false, pricingId);
}
}
/**
* 处理会话关闭事件
*/
private void handleSessionClose(JcppUplinkMessage uplinkMessage) {
String pileCode = uplinkMessage.getPileCode();
JSONObject data = JSON.parseObject(uplinkMessage.getData());
String reason = data.getString("reason");
log.info("处理会话关闭: pileCode={}, reason={}", pileCode, reason);
// 更新所有枪状态为离线
int result = pileConnectorInfoService.updateConnectorStatusByPileSn(pileCode, "0");
log.info("更新枪状态为离线: pileCode={}, affectedRows={}", pileCode, result);
// TODO: 查询是否有正在充电的订单,如果有则标记为异常
}
/**
* 处理远程操作结果
*/
@Transactional(rollbackFor = Exception.class)
private void handleRemoteResult(JcppUplinkMessage uplinkMessage) {
String messageType = uplinkMessage.getMessageType();
JSONObject data = JSON.parseObject(uplinkMessage.getData());
if (JcppConstants.MessageType.REMOTE_START_RESULT.equals(messageType)) {
handleRemoteStartResult(data);
} else if (JcppConstants.MessageType.REMOTE_STOP_RESULT.equals(messageType)) {
handleRemoteStopResult(data);
}
}
/**
* 处理远程启动结果
*/
private void handleRemoteStartResult(JSONObject data) {
String tradeNo = data.getString("tradeNo");
Boolean success = data.getBoolean("success");
String failReason = data.getString("failReason");
// 从 data 中获取 pileCode 和 gunNo如果有
String pileCode = data.getString("pileCode");
String gunNo = data.getString("gunNo");
if (tradeNo == null || tradeNo.isEmpty()) {
log.warn("远程启动结果缺少 tradeNo");
return;
}
OrderBasicInfo order = orderBasicInfoService.getOrderInfoByTransactionCode(tradeNo);
if (order == null) {
log.warn("订单不存在: tradeNo={}", tradeNo);
return;
}
if (Boolean.TRUE.equals(success)) {
// 启动成功
OrderBasicInfo updateOrder = new OrderBasicInfo();
updateOrder.setId(order.getId());
updateOrder.setOrderStatus("1"); // 充电中
orderBasicInfoService.updateOrderBasicInfo(updateOrder);
String pileConnectorCode = pileCode + gunNo;
pileConnectorInfoService.updateConnectorStatus(pileConnectorCode, "3");
log.info("远程启动成功: tradeNo={}", tradeNo);
} else {
// 启动失败
OrderBasicInfo updateOrder = new OrderBasicInfo();
updateOrder.setId(order.getId());
updateOrder.setOrderStatus("3"); // 已取消
updateOrder.setReason("启动失败: " + failReason);
orderBasicInfoService.updateOrderBasicInfo(updateOrder);
log.warn("远程启动失败: tradeNo={}, reason={}", tradeNo, failReason);
// TODO: 如果已预付费,触发退款流程
}
}
/**
* 处理远程停止结果
*/
private void handleRemoteStopResult(JSONObject data) {
Boolean success = data.getBoolean("success");
String failReason = data.getString("failReason");
// 从 data 中获取 pileCode 和 gunNo如果有
String pileCode = data.getString("pileCode");
String gunNo = data.getString("gunNo");
if (Boolean.TRUE.equals(success)) {
log.info("远程停止成功: pileCode={}, gunNo={}", pileCode, gunNo);
} else {
log.warn("远程停止失败: pileCode=, gunNo={}, reason={}", pileCode, gunNo, failReason);
}
}
/**
* 映射枪状态JCPP 状态 -> 系统状态
*/
private String mapGunStatus(String jcppStatus) {
if (jcppStatus == null) {
return "0";
}
switch (jcppStatus) {
case "IDLE":
return "1"; // 空闲
case "INSERTED":
return "2"; // 占用(未充电)
case "CHARGING":
return "3"; // 占用(充电中)
case "CHARGE_COMPLETE":
return "2"; // 占用(未充电)- 充电完成但未拔枪
case "FAULT":
return "255"; // 故障
case "UNKNOWN":
default:
return "0"; // 离网
}
}
}

View File

@@ -0,0 +1,465 @@
package com.jsowell.pile.jcpp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.jsowell.common.util.StringUtils;
import com.jsowell.pile.domain.JcppSyncRecord;
import com.jsowell.pile.domain.PileBasicInfo;
import com.jsowell.pile.domain.PileConnectorInfo;
import com.jsowell.pile.jcpp.dto.sync.*;
import com.jsowell.pile.jcpp.service.IJcppPileSyncService;
import com.jsowell.pile.mapper.JcppSyncRecordMapper;
import com.jsowell.pile.service.PileBasicInfoService;
import com.jsowell.pile.service.PileConnectorInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* JCPP 充电桩同步服务实现
*
* @author jsowell
*/
@Slf4j
@Service
public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
@Autowired
private PileBasicInfoService pileBasicInfoService;
@Autowired
private PileConnectorInfoService pileConnectorInfoService;
@Autowired
private JcppSyncRecordMapper jcppSyncRecordMapper;
@Autowired
private RestTemplate restTemplate;
@Value("${jcpp.sync.api-url:http://localhost:8080/api/sync}")
private String jcppApiUrl;
@Value("${jcpp.sync.batch-size:100}")
private int batchSize;
@Value("${jcpp.sync.timeout:60000}")
private int timeout;
/**
* 全量同步充电桩数据到 JCPP
*/
@Override
public JcppSyncResponse syncAllPiles() {
log.info("开始全量同步充电桩数据到 JCPP");
// 创建同步记录
JcppSyncRecord record = createSyncRecord("FULL");
try {
// 1. 查询所有充电桩(未删除的)
PileBasicInfo queryPile = new PileBasicInfo();
queryPile.setDelFlag("0");
List<PileBasicInfo> pileList = pileBasicInfoService.selectPileBasicInfoList(queryPile);
log.info("查询到 {} 个充电桩", pileList.size());
// 2. 查询所有充电枪(未删除的)
PileConnectorInfo queryGun = new PileConnectorInfo();
queryGun.setDelFlag("0");
List<PileConnectorInfo> gunList = pileConnectorInfoService.selectPileConnectorInfoList(queryGun);
log.info("查询到 {} 个充电枪", gunList.size());
// 3. 转换数据格式
List<JcppPileSyncDTO> pileDTOs = convertPilesToDTO(pileList);
List<JcppGunSyncDTO> gunDTOs = convertGunsToDTO(gunList);
// 4. 调用 JCPP 同步接口
JcppSyncResponse response = callJcppSyncApi(pileDTOs, gunDTOs);
// 5. 更新同步记录
updateSyncRecord(record, response, "SUCCESS");
log.info("全量同步完成: 充电桩 {}/{}, 充电枪 {}/{}",
response.getSuccessPiles(), response.getTotalPiles(),
response.getSuccessGuns(), response.getTotalGuns());
return response;
} catch (Exception e) {
log.error("全量同步失败", e);
updateSyncRecord(record, null, "FAILED", e.getMessage());
throw new RuntimeException("全量同步失败: " + e.getMessage(), e);
}
}
/**
* 增量同步充电桩数据到 JCPP
*/
@Override
public JcppSyncResponse syncIncrementalPiles(Date lastSyncTime) {
log.info("开始增量同步充电桩数据到 JCPP");
// 如果未指定上次同步时间,查询最后一次成功的同步记录
if (lastSyncTime == null) {
JcppSyncRecord lastRecord = jcppSyncRecordMapper.selectLastSuccessRecord("INCREMENTAL");
if (lastRecord != null) {
lastSyncTime = lastRecord.getStartTime();
log.info("使用最后一次成功同步时间: {}", lastSyncTime);
} else {
// 如果没有历史记录,使用全量同步
log.warn("未找到历史同步记录,改为全量同步");
return syncAllPiles();
}
}
// 创建同步记录
JcppSyncRecord record = createSyncRecord("INCREMENTAL");
try {
// 1. 查询更新时间大于 lastSyncTime 的充电桩
// 注意:这里暂时使用全量查询,然后在内存中过滤
// TODO: 后续可以在 Mapper 中添加按 updateTime 查询的方法以提升性能
PileBasicInfo queryPile = new PileBasicInfo();
queryPile.setDelFlag("0");
List<PileBasicInfo> allPiles = pileBasicInfoService.selectPileBasicInfoList(queryPile);
// 过滤出更新时间大于 lastSyncTime 的充电桩
final Date finalLastSyncTime = lastSyncTime;
List<PileBasicInfo> pileList = allPiles.stream()
.filter(pile -> pile.getUpdateTime() != null && pile.getUpdateTime().after(finalLastSyncTime))
.collect(java.util.stream.Collectors.toList());
log.info("查询到 {} 个更新的充电桩", pileList.size());
// 2. 查询更新时间大于 lastSyncTime 的充电枪
PileConnectorInfo queryGun = new PileConnectorInfo();
queryGun.setDelFlag("0");
List<PileConnectorInfo> allGuns = pileConnectorInfoService.selectPileConnectorInfoList(queryGun);
// 过滤出更新时间大于 lastSyncTime 的充电枪
List<PileConnectorInfo> gunList = allGuns.stream()
.filter(gun -> gun.getUpdateTime() != null && gun.getUpdateTime().after(finalLastSyncTime))
.collect(java.util.stream.Collectors.toList());
log.info("查询到 {} 个更新的充电枪", gunList.size());
// 3. 转换数据格式
List<JcppPileSyncDTO> pileDTOs = convertPilesToDTO(pileList);
List<JcppGunSyncDTO> gunDTOs = convertGunsToDTO(gunList);
// 4. 调用 JCPP 同步接口
JcppSyncResponse response = callJcppSyncApi(pileDTOs, gunDTOs);
// 5. 更新同步记录
updateSyncRecord(record, response, "SUCCESS");
log.info("增量同步完成: 充电桩 {}/{}, 充电枪 {}/{}",
response.getSuccessPiles(), response.getTotalPiles(),
response.getSuccessGuns(), response.getTotalGuns());
return response;
} catch (Exception e) {
log.error("增量同步失败", e);
updateSyncRecord(record, null, "FAILED", e.getMessage());
throw new RuntimeException("增量同步失败: " + e.getMessage(), e);
}
}
/**
* 同步单个充电桩
*/
@Override
public boolean syncSinglePile(String pileSn) {
log.info("开始同步单个充电桩: {}", pileSn);
try {
// 1. 查询充电桩
PileBasicInfo pile = pileBasicInfoService.selectPileBasicInfoBySN(pileSn);
if (pile == null) {
log.warn("充电桩不存在: {}", pileSn);
return false;
}
// 2. 查询该充电桩的所有充电枪
PileConnectorInfo queryGun = new PileConnectorInfo();
queryGun.setPileSn(pileSn);
queryGun.setDelFlag("0");
List<PileConnectorInfo> gunList = pileConnectorInfoService.selectPileConnectorInfoList(queryGun);
// 3. 转换数据格式
List<JcppPileSyncDTO> pileDTOs = convertPilesToDTO(List.of(pile));
List<JcppGunSyncDTO> gunDTOs = convertGunsToDTO(gunList);
// 4. 调用 JCPP 同步接口
JcppSyncResponse response = callJcppSyncApi(pileDTOs, gunDTOs);
log.info("单个充电桩同步完成: {}, 结果: {}", pileSn, response.getSuccess());
return response.getSuccess();
} catch (Exception e) {
log.error("同步单个充电桩失败: {}", pileSn, e);
return false;
}
}
/**
* 转换充电桩数据为 DTO
*/
private List<JcppPileSyncDTO> convertPilesToDTO(List<PileBasicInfo> pileList) {
List<JcppPileSyncDTO> dtoList = new ArrayList<>();
for (PileBasicInfo pile : pileList) {
JcppPileSyncDTO dto = new JcppPileSyncDTO();
// 基本字段
dto.setPileCode(pile.getSn());
dto.setPileName(pile.getName());
dto.setProtocol(pile.getSoftwareProtocol());
// 品牌、型号、制造商(可为空)
dto.setBrand(null); // Web 项目中没有这些字段
dto.setModel(null);
dto.setManufacturer(null);
// 类型映射1-运营桩 → OPERATION, 2-个人桩 → PERSONAL
String type = "OPERATION"; // 默认运营桩
if ("2".equals(pile.getBusinessType())) {
type = "PERSONAL";
}
dto.setType(type);
// 构建附加信息
JSONObject additionalInfo = new JSONObject();
additionalInfo.put("webPileId", pile.getId());
additionalInfo.put("webStationId", pile.getStationId());
additionalInfo.put("businessType", pile.getBusinessType());
additionalInfo.put("secretKey", pile.getSecretKey());
additionalInfo.put("longitude", pile.getLongitude());
additionalInfo.put("latitude", pile.getLatitude());
additionalInfo.put("iccid", pile.getIccid());
additionalInfo.put("merchantId", pile.getMerchantId());
additionalInfo.put("vinFlag", pile.getVinFlag());
dto.setAdditionalInfo(additionalInfo);
dtoList.add(dto);
}
return dtoList;
}
/**
* 转换充电枪数据为 DTO
*/
private List<JcppGunSyncDTO> convertGunsToDTO(List<PileConnectorInfo> gunList) {
List<JcppGunSyncDTO> dtoList = new ArrayList<>();
for (PileConnectorInfo gun : gunList) {
JcppGunSyncDTO dto = new JcppGunSyncDTO();
// 基本字段
dto.setGunCode(gun.getPileConnectorCode());
dto.setGunName(gun.getName());
dto.setPileCode(gun.getPileSn());
// 提取枪号(最后 2 位)
String gunNo = extractGunNo(gun.getPileConnectorCode());
dto.setGunNo(gunNo);
// 构建附加信息
JSONObject additionalInfo = new JSONObject();
additionalInfo.put("webGunId", gun.getId());
additionalInfo.put("status", gun.getStatus());
additionalInfo.put("parkNo", gun.getParkNo());
dto.setAdditionalInfo(additionalInfo);
dtoList.add(dto);
}
return dtoList;
}
/**
* 从充电枪编码中提取枪号(最后 2 位)
*/
private String extractGunNo(String gunCode) {
if (StringUtils.isEmpty(gunCode) || gunCode.length() < 2) {
return "01"; // 默认值
}
return gunCode.substring(gunCode.length() - 2);
}
/**
* 调用 JCPP 同步接口
*/
private JcppSyncResponse callJcppSyncApi(List<JcppPileSyncDTO> pileDTOs, List<JcppGunSyncDTO> gunDTOs) {
List<JcppSyncResult> pileResults = new ArrayList<>();
List<JcppSyncResult> gunResults = new ArrayList<>();
try {
// 1. 同步充电桩
if (pileDTOs != null && !pileDTOs.isEmpty()) {
pileResults = syncPilesToJcpp(pileDTOs);
}
// 2. 同步充电枪
if (gunDTOs != null && !gunDTOs.isEmpty()) {
gunResults = syncGunsToJcpp(gunDTOs);
}
// 3. 构建响应
return JcppSyncResponse.build(pileResults, gunResults);
} catch (Exception e) {
log.error("调用 JCPP 同步接口失败", e);
throw new RuntimeException("调用 JCPP 同步接口失败: " + e.getMessage(), e);
}
}
/**
* 同步充电桩到 JCPP
*/
private List<JcppSyncResult> syncPilesToJcpp(List<JcppPileSyncDTO> pileDTOs) {
String url = jcppApiUrl + "/piles";
// 构建请求体
JSONObject requestBody = new JSONObject();
requestBody.put("piles", pileDTOs);
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(requestBody.toJSONString(), headers);
try {
// 发送请求
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
if (response.getStatusCode() == HttpStatus.OK) {
// 解析响应
JSONObject responseBody = JSON.parseObject(response.getBody());
List<JcppSyncResult> results = responseBody.getList("results", JcppSyncResult.class);
return results != null ? results : new ArrayList<>();
} else {
log.error("JCPP 充电桩同步接口返回错误: {}", response.getStatusCode());
// 返回失败结果
List<JcppSyncResult> results = new ArrayList<>();
for (JcppPileSyncDTO dto : pileDTOs) {
results.add(JcppSyncResult.fail(dto.getPileCode(), "接口返回错误: " + response.getStatusCode()));
}
return results;
}
} catch (Exception e) {
log.error("调用 JCPP 充电桩同步接口异常", e);
// 返回失败结果
List<JcppSyncResult> results = new ArrayList<>();
for (JcppPileSyncDTO dto : pileDTOs) {
results.add(JcppSyncResult.fail(dto.getPileCode(), "接口调用异常: " + e.getMessage()));
}
return results;
}
}
/**
* 同步充电枪到 JCPP
*/
private List<JcppSyncResult> syncGunsToJcpp(List<JcppGunSyncDTO> gunDTOs) {
String url = jcppApiUrl + "/guns";
// 构建请求体
JSONObject requestBody = new JSONObject();
requestBody.put("guns", gunDTOs);
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(requestBody.toJSONString(), headers);
try {
// 发送请求
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
if (response.getStatusCode() == HttpStatus.OK) {
// 解析响应
JSONObject responseBody = JSON.parseObject(response.getBody());
List<JcppSyncResult> results = responseBody.getList("results", JcppSyncResult.class);
return results != null ? results : new ArrayList<>();
} else {
log.error("JCPP 充电枪同步接口返回错误: {}", response.getStatusCode());
// 返回失败结果
List<JcppSyncResult> results = new ArrayList<>();
for (JcppGunSyncDTO dto : gunDTOs) {
results.add(JcppSyncResult.fail(dto.getGunCode(), "接口返回错误: " + response.getStatusCode()));
}
return results;
}
} catch (Exception e) {
log.error("调用 JCPP 充电枪同步接口异常", e);
// 返回失败结果
List<JcppSyncResult> results = new ArrayList<>();
for (JcppGunSyncDTO dto : gunDTOs) {
results.add(JcppSyncResult.fail(dto.getGunCode(), "接口调用异常: " + e.getMessage()));
}
return results;
}
}
/**
* 创建同步记录
*/
private JcppSyncRecord createSyncRecord(String syncType) {
JcppSyncRecord record = new JcppSyncRecord();
record.setSyncType(syncType);
record.setSyncStatus("RUNNING");
record.setStartTime(new Date());
jcppSyncRecordMapper.insertJcppSyncRecord(record);
return record;
}
/**
* 更新同步记录(成功)
*/
private void updateSyncRecord(JcppSyncRecord record, JcppSyncResponse response, String status) {
updateSyncRecord(record, response, status, null);
}
/**
* 更新同步记录
*/
private void updateSyncRecord(JcppSyncRecord record, JcppSyncResponse response, String status, String errorMessage) {
record.setSyncStatus(status);
record.setEndTime(new Date());
if (response != null) {
record.setTotalPiles(response.getTotalPiles());
record.setSuccessPiles(response.getSuccessPiles());
record.setFailedPiles(response.getFailedPiles());
record.setTotalGuns(response.getTotalGuns());
record.setSuccessGuns(response.getSuccessGuns());
record.setFailedGuns(response.getFailedGuns());
if (response.getErrors() != null && !response.getErrors().isEmpty()) {
record.setErrorMessage(String.join("; ", response.getErrors()));
}
}
if (errorMessage != null) {
record.setErrorMessage(errorMessage);
}
jcppSyncRecordMapper.updateJcppSyncRecord(record);
}
}

View File

@@ -0,0 +1,97 @@
package com.jsowell.pile.jcpp.util;
import com.google.common.hash.Hashing;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* JCPP 消息分区计算器
* 使用 MurmurHash3_128 算法计算消息分区,与 JCPP 保持一致
*
* @author jsowell
*/
@Slf4j
public class JcppPartitionCalculator {
/**
* 默认分区数量
*/
private static final int DEFAULT_PARTITION_COUNT = 10;
/**
* 分区数量(可配置)
*/
private static int partitionCount = DEFAULT_PARTITION_COUNT;
/**
* 设置分区数量
*
* @param count 分区数量
*/
public static void setPartitionCount(int count) {
if (count <= 0) {
throw new IllegalArgumentException("分区数量必须大于 0");
}
partitionCount = count;
log.info("JCPP 消息分区数量设置为: {}", partitionCount);
}
/**
* 获取当前分区数量
*
* @return 分区数量
*/
public static int getPartitionCount() {
return partitionCount;
}
/**
* 根据消息键计算分区编号
* 使用 MurmurHash3_128 算法(与 JCPP 一致)
*
* @param messageKey 消息键(通常是 pileCode
* @return 分区编号0 到 partitionCount-1
*/
public static int getPartition(String messageKey) {
if (messageKey == null || messageKey.isEmpty()) {
log.warn("消息键为空,使用默认分区 0");
return 0;
}
// 使用 MurmurHash3_128 算法计算 hash 值
long hash = Hashing.murmur3_128()
.hashString(messageKey, StandardCharsets.UTF_8)
.asLong();
// 取绝对值并对分区数取模
int partition = Math.abs((int) (hash % partitionCount));
log.debug("消息键: {}, hash: {}, 分区: {}", messageKey, hash, partition);
return partition;
}
/**
* 获取指定分区的队列名称
*
* @param partition 分区编号
* @return 队列名称
*/
public static String getQueueName(int partition) {
return "jcpp.uplink.partition." + partition;
}
/**
* 获取所有分区的队列名称数组
*
* @return 队列名称数组
*/
public static String[] getAllQueueNames() {
String[] queueNames = new String[partitionCount];
for (int i = 0; i < partitionCount; i++) {
queueNames[i] = getQueueName(i);
}
return queueNames;
}
}

View File

@@ -0,0 +1,69 @@
package com.jsowell.pile.mapper;
import com.jsowell.pile.domain.JcppSyncRecord;
import java.util.List;
/**
* JCPP 充电桩同步记录 Mapper 接口
*
* @author jsowell
*/
public interface JcppSyncRecordMapper {
/**
* 查询 JCPP 充电桩同步记录
*
* @param id JCPP 充电桩同步记录主键
* @return JCPP 充电桩同步记录
*/
JcppSyncRecord selectJcppSyncRecordById(Long id);
/**
* 查询 JCPP 充电桩同步记录列表
*
* @param jcppSyncRecord JCPP 充电桩同步记录
* @return JCPP 充电桩同步记录集合
*/
List<JcppSyncRecord> selectJcppSyncRecordList(JcppSyncRecord jcppSyncRecord);
/**
* 新增 JCPP 充电桩同步记录
*
* @param jcppSyncRecord JCPP 充电桩同步记录
* @return 结果
*/
int insertJcppSyncRecord(JcppSyncRecord jcppSyncRecord);
/**
* 修改 JCPP 充电桩同步记录
*
* @param jcppSyncRecord JCPP 充电桩同步记录
* @return 结果
*/
int updateJcppSyncRecord(JcppSyncRecord jcppSyncRecord);
/**
* 删除 JCPP 充电桩同步记录
*
* @param id JCPP 充电桩同步记录主键
* @return 结果
*/
int deleteJcppSyncRecordById(Long id);
/**
* 批量删除 JCPP 充电桩同步记录
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
int deleteJcppSyncRecordByIds(Long[] ids);
/**
* 查询最后一次成功的同步记录
*
* @param syncType 同步类型
* @return 同步记录
*/
JcppSyncRecord selectLastSuccessRecord(String syncType);
}

View File

@@ -0,0 +1,113 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jsowell.pile.mapper.JcppSyncRecordMapper">
<resultMap type="com.jsowell.pile.domain.JcppSyncRecord" id="JcppSyncRecordResult">
<result property="id" column="id" />
<result property="syncType" column="sync_type" />
<result property="syncStatus" column="sync_status" />
<result property="startTime" column="start_time" />
<result property="endTime" column="end_time" />
<result property="totalPiles" column="total_piles" />
<result property="successPiles" column="success_piles" />
<result property="failedPiles" column="failed_piles" />
<result property="totalGuns" column="total_guns" />
<result property="successGuns" column="success_guns" />
<result property="failedGuns" column="failed_guns" />
<result property="errorMessage" column="error_message" />
<result property="createBy" column="create_by" />
<result property="createTime" column="create_time" />
</resultMap>
<sql id="selectJcppSyncRecordVo">
select id, sync_type, sync_status, start_time, end_time, total_piles, success_piles, failed_piles, total_guns, success_guns, failed_guns, error_message, create_by, create_time from jcpp_sync_record
</sql>
<select id="selectJcppSyncRecordList" parameterType="com.jsowell.pile.domain.JcppSyncRecord" resultMap="JcppSyncRecordResult">
<include refid="selectJcppSyncRecordVo"/>
<where>
<if test="syncType != null and syncType != ''"> and sync_type = #{syncType}</if>
<if test="syncStatus != null and syncStatus != ''"> and sync_status = #{syncStatus}</if>
<if test="startTime != null "> and start_time &gt;= #{startTime}</if>
<if test="endTime != null "> and end_time &lt;= #{endTime}</if>
</where>
order by start_time desc
</select>
<select id="selectJcppSyncRecordById" parameterType="Long" resultMap="JcppSyncRecordResult">
<include refid="selectJcppSyncRecordVo"/>
where id = #{id}
</select>
<select id="selectLastSuccessRecord" parameterType="String" resultMap="JcppSyncRecordResult">
<include refid="selectJcppSyncRecordVo"/>
where sync_type = #{syncType} and sync_status = 'SUCCESS'
order by start_time desc
limit 1
</select>
<insert id="insertJcppSyncRecord" parameterType="com.jsowell.pile.domain.JcppSyncRecord" useGeneratedKeys="true" keyProperty="id">
insert into jcpp_sync_record
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="syncType != null and syncType != ''">sync_type,</if>
<if test="syncStatus != null and syncStatus != ''">sync_status,</if>
<if test="startTime != null">start_time,</if>
<if test="endTime != null">end_time,</if>
<if test="totalPiles != null">total_piles,</if>
<if test="successPiles != null">success_piles,</if>
<if test="failedPiles != null">failed_piles,</if>
<if test="totalGuns != null">total_guns,</if>
<if test="successGuns != null">success_guns,</if>
<if test="failedGuns != null">failed_guns,</if>
<if test="errorMessage != null">error_message,</if>
<if test="createBy != null">create_by,</if>
<if test="createTime != null">create_time,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="syncType != null and syncType != ''">#{syncType},</if>
<if test="syncStatus != null and syncStatus != ''">#{syncStatus},</if>
<if test="startTime != null">#{startTime},</if>
<if test="endTime != null">#{endTime},</if>
<if test="totalPiles != null">#{totalPiles},</if>
<if test="successPiles != null">#{successPiles},</if>
<if test="failedPiles != null">#{failedPiles},</if>
<if test="totalGuns != null">#{totalGuns},</if>
<if test="successGuns != null">#{successGuns},</if>
<if test="failedGuns != null">#{failedGuns},</if>
<if test="errorMessage != null">#{errorMessage},</if>
<if test="createBy != null">#{createBy},</if>
<if test="createTime != null">#{createTime},</if>
</trim>
</insert>
<update id="updateJcppSyncRecord" parameterType="com.jsowell.pile.domain.JcppSyncRecord">
update jcpp_sync_record
<trim prefix="SET" suffixOverrides=",">
<if test="syncType != null and syncType != ''">sync_type = #{syncType},</if>
<if test="syncStatus != null and syncStatus != ''">sync_status = #{syncStatus},</if>
<if test="startTime != null">start_time = #{startTime},</if>
<if test="endTime != null">end_time = #{endTime},</if>
<if test="totalPiles != null">total_piles = #{totalPiles},</if>
<if test="successPiles != null">success_piles = #{successPiles},</if>
<if test="failedPiles != null">failed_piles = #{failedPiles},</if>
<if test="totalGuns != null">total_guns = #{totalGuns},</if>
<if test="successGuns != null">success_guns = #{successGuns},</if>
<if test="failedGuns != null">failed_guns = #{failedGuns},</if>
<if test="errorMessage != null">error_message = #{errorMessage},</if>
</trim>
where id = #{id}
</update>
<delete id="deleteJcppSyncRecordById" parameterType="Long">
delete from jcpp_sync_record where id = #{id}
</delete>
<delete id="deleteJcppSyncRecordByIds" parameterType="String">
delete from jcpp_sync_record where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</delete>
</mapper>