同步充电桩数据到JCPP

This commit is contained in:
Guoqs
2026-01-06 10:12:49 +08:00
parent 30ec68abb1
commit 8f9cbc6cf8
6 changed files with 258 additions and 26 deletions

View File

@@ -5,6 +5,14 @@ jsowell:
# JCPP 配置
jcpp:
# 连接超时时间(毫秒)
connect-timeout: 10000
# 读取超时时间(毫秒)
timeout: 30000
# 同步接口连接超时时间(毫秒)
sync-connect-timeout: 10000
# 同步接口读取超时时间(毫秒)- 批量同步需要更长时间
sync-timeout: 120000
rabbitmq:
# 分区数量(与 JCPP 保持一致)
partition-count: 10
@@ -15,8 +23,8 @@ jcpp:
sync:
# JCPP 同步接口地址
api-url: http://localhost:8180/api/sync
# 批量同步大小
batch-size: 100
# 批量同步大小(每批充电桩数量)
batch-size: 500
# 超时时间(毫秒)
timeout: 60000
# 是否启用自动增量同步

View File

@@ -84,6 +84,9 @@ public class CacheConstants {
// 查询桩型号信息
public static final String GET_PILE_MODEL_INFO_BY_MODEL_ID = "get_pile_model_info_by_model_id:";
// 根据ID查询桩型号信息
public static final String PILE_MODEL_INFO_BY_ID = "pile_model_info_by_id:";
// 地锁数据
public static final String GROUND_LOCK_DATA = "ground_lock_data:";

View File

@@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
@@ -22,7 +23,7 @@ public class JcppConfig {
/**
* JCPP 服务地址
*/
private String url = "http://localhost:8080";
private String url = "http://localhost:8180";
/**
* 下行接口路径
@@ -44,6 +45,16 @@ public class JcppConfig {
*/
private int connectTimeout = 3000;
/**
* 同步接口超时时间(毫秒)- 批量同步需要更长时间
*/
private int syncTimeout = 120000;
/**
* 同步接口连接超时时间(毫秒)
*/
private int syncConnectTimeout = 10000;
/**
* 是否启用 JCPP 对接
*/
@@ -74,8 +85,9 @@ public class JcppConfig {
}
/**
* 创建 JCPP 专用的 RestTemplate
* 创建 JCPP 专用的 RestTemplate(默认)
*/
@Primary
@Bean("jcppRestTemplate")
public RestTemplate jcppRestTemplate() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
@@ -86,4 +98,18 @@ public class JcppConfig {
url, timeout, connectTimeout);
return restTemplate;
}
/**
* 创建 JCPP 同步专用的 RestTemplate超时时间更长
*/
@Bean("jcppSyncRestTemplate")
public RestTemplate jcppSyncRestTemplate() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(syncConnectTimeout);
factory.setReadTimeout(syncTimeout);
RestTemplate restTemplate = new RestTemplate(factory);
log.info("JCPP 同步 RestTemplate 初始化完成, syncTimeout: {}ms, syncConnectTimeout: {}ms",
syncTimeout, syncConnectTimeout);
return restTemplate;
}
}

View File

@@ -56,4 +56,8 @@ public class JcppSyncResult implements Serializable {
result.setMessage(message);
return result;
}
public Boolean isSuccess() {
return success;
}
}

View File

@@ -17,6 +17,7 @@ import com.jsowell.pile.service.PileConnectorInfoService;
import com.jsowell.pile.service.PileModelInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
@@ -48,6 +49,7 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
private IJcppAuthService jcppAuthService;
@Autowired
@Qualifier("jcppSyncRestTemplate")
private RestTemplate restTemplate;
@Autowired
@@ -56,18 +58,18 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
@Value("${jcpp.sync.api-url:http://localhost:8180/api/sync}")
private String jcppApiUrl;
@Value("${jcpp.sync.batch-size:100}")
@Value("${jcpp.sync.batch-size:500}")
private int batchSize;
@Value("${jcpp.sync.timeout:60000}")
private int timeout;
/**
* 全量同步充电桩数据到 JCPP
* 全量同步充电桩数据到 JCPP(批量处理)
*/
@Override
public JcppSyncResponse syncAllPiles() {
log.info("开始全量同步充电桩数据到 JCPP");
log.info("开始全量同步充电桩数据到 JCPP,批量大小: {}", batchSize);
// 创建同步记录
JcppSyncRecord record = createSyncRecord("FULL");
@@ -78,7 +80,7 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
queryPile.setDelFlag("0");
List<PileBasicInfo> pileList = pileBasicInfoService.selectPileBasicInfoList(queryPile);
log.info("查询到 {} 个充电桩", pileList.size());
log.info("查询到 {} 个充电桩,准备分批同步", pileList.size());
// 2. 查询所有充电枪(未删除的)
PileConnectorInfo queryGun = new PileConnectorInfo();
@@ -87,19 +89,86 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
log.info("查询到 {} 个充电枪", gunList.size());
// 3. 转换数据格式
List<JcppPileSyncDTO> pileDTOs = convertPilesToDTO(pileList);
List<JcppGunSyncDTO> gunDTOs = convertGunsToDTO(gunList);
// 3. 按批次分割充电桩列表
List<List<PileBasicInfo>> pileBatches = Lists.partition(pileList, batchSize);
int totalBatches = pileBatches.size();
log.info("充电桩分为 {} 批,每批最多 {} 台", totalBatches, batchSize);
// 4. 调用 JCPP 同步接口
JcppSyncResponse response = callJcppSyncApi(pileDTOs, gunDTOs);
// 4. 汇总所有批次的同步结果
List<JcppSyncResult> allPileResults = new ArrayList<>();
List<JcppSyncResult> allGunResults = new ArrayList<>();
// 5. 更新同步记录
// 5. 逐批处理充电桩
for (int i = 0; i < pileBatches.size(); i++) {
List<PileBasicInfo> batchPiles = pileBatches.get(i);
int batchNo = i + 1;
log.info("开始处理第 {}/{} 批充电桩,本批数量: {}", batchNo, totalBatches, batchPiles.size());
try {
// 5.1 转换充电桩数据
List<JcppPileSyncDTO> pileDTOs = convertPilesToDTO(batchPiles);
// 5.2 先同步充电桩(充电枪依赖充电桩,必须先同步充电桩)
log.info("第 {} 批:开始同步充电桩...", batchNo);
List<JcppSyncResult> batchPileResults = syncPilesToJcpp(pileDTOs);
allPileResults.addAll(batchPileResults);
// 统计充电桩同步结果
long batchPileSuccess = batchPileResults.stream().filter(JcppSyncResult::isSuccess).count();
long batchPileFailed = batchPiles.size() - batchPileSuccess;
log.info("第 {} 批:充电桩同步完成 {}/{} (成功/总数)", batchNo, batchPileSuccess, batchPiles.size());
// 5.3 查找本批充电桩对应的充电枪(只同步充电桩成功的枪)
List<String> successPileSns = batchPileResults.stream()
.filter(JcppSyncResult::isSuccess)
.map(JcppSyncResult::getCode)
.collect(java.util.stream.Collectors.toList());
List<PileConnectorInfo> batchGuns = gunList.stream()
.filter(gun -> successPileSns.contains(gun.getPileSn()))
.collect(java.util.stream.Collectors.toList());
log.info("第 {} 批:找到 {} 个充电枪(对应 {} 个成功的充电桩)", batchNo, batchGuns.size(), successPileSns.size());
// 5.4 同步充电枪(只同步充电桩成功的枪)
List<JcppSyncResult> batchGunResults = new ArrayList<>();
if (!batchGuns.isEmpty()) {
List<JcppGunSyncDTO> gunDTOs = convertGunsToDTO(batchGuns);
log.info("第 {} 批:开始同步充电枪...", batchNo);
batchGunResults = syncGunsToJcpp(gunDTOs);
allGunResults.addAll(batchGunResults);
long batchGunSuccess = batchGunResults.stream().filter(JcppSyncResult::isSuccess).count();
log.info("第 {} 批:充电枪同步完成 {}/{} (成功/总数)", batchNo, batchGunSuccess, batchGuns.size());
} else {
log.warn("第 {} 批:没有需要同步的充电枪(充电桩全部失败)", batchNo);
}
// 5.5 统计本批总体结果
long batchGunSuccess = batchGunResults.stream().filter(JcppSyncResult::isSuccess).count();
log.info("第 {}/{} 批同步完成: 充电桩 {}/{}, 充电枪 {}/{}",
batchNo, totalBatches,
batchPileSuccess, batchPiles.size(),
batchGunSuccess, batchGuns.size());
} catch (Exception e) {
log.error("第 {} 批同步失败", batchNo, e);
// 记录失败,但继续处理下一批
for (PileBasicInfo pile : batchPiles) {
allPileResults.add(JcppSyncResult.fail(pile.getSn(), "批次同步异常: " + e.getMessage()));
}
}
}
// 6. 构建最终响应
JcppSyncResponse response = JcppSyncResponse.build(allPileResults, allGunResults);
// 7. 更新同步记录
updateSyncRecord(record, response, "SUCCESS");
log.info("全量同步完成: 充电桩 {}/{}, 充电枪 {}/{}",
response.getSuccessPiles(), response.getTotalPiles(),
response.getSuccessGuns(), response.getTotalGuns());
return response;
@@ -302,8 +371,8 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
String gunName = gun.getName();
if (StringUtils.isEmpty(gunName)) {
String gunNo = extractGunNo(gun.getPileConnectorCode());
gunName = "充电枪" + gunNo;
log.warn("充电枪名称为空,使用默认名称: {} (gunCode: {})", gunName, gun.getPileConnectorCode());
gunName = gun.getPileSn() + "" + gunNo + "号枪";
// log.warn("充电枪名称为空,使用默认名称: {} (gunCode: {})", gunName, gun.getPileConnectorCode());
}
dto.setGunName(gunName);
@@ -395,12 +464,34 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
HttpEntity<String> entity = new HttpEntity<>(requestBody.toJSONString(), headers);
// 发送请求
log.info("调用 JCPP 充电桩同步接口: {}, 数量: {}", url, pileDTOs.size());
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
// 打印响应状态和内容
log.info("JCPP 充电桩同步接口响应 - 状态码: {}, 响应体: {}",
response.getStatusCode(), response.getBody());
if (response.getStatusCode() == HttpStatus.OK) {
// 解析响应
JSONObject responseBody = JSON.parseObject(response.getBody());
List<JcppSyncResult> results = responseBody.getList("results", JcppSyncResult.class);
// JCPP 响应格式:{ "success": true, "data": { "results": [...] } }
// 需要从 data 中获取 results
JSONObject data = responseBody.getJSONObject("data");
List<JcppSyncResult> results = null;
if (data != null) {
results = data.getList("results", JcppSyncResult.class);
} else {
log.warn("JCPP 充电桩同步响应中没有 data 字段");
results = new ArrayList<>();
}
// 统计结果
long successCount = results != null ? results.stream().filter(JcppSyncResult::isSuccess).count() : 0;
long failCount = results != null ? results.size() - successCount : 0;
log.info("JCPP 充电桩同步结果 - 成功: {}, 失败: {}", successCount, failCount);
return results != null ? results : new ArrayList<>();
} else if (response.getStatusCode() == HttpStatus.UNAUTHORIZED) {
// token 过期,清除缓存并重试一次
@@ -409,7 +500,8 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
// 递归调用重试(只重试一次)
return retrySyncPilesToJcpp(pileDTOs);
} else {
log.error("JCPP 充电桩同步接口返回错误: {}", response.getStatusCode());
log.error("JCPP 充电桩同步接口返回错误 - 状态码: {}, 响应体: {}",
response.getStatusCode(), response.getBody());
// 返回失败结果
List<JcppSyncResult> results = new ArrayList<>();
for (JcppPileSyncDTO dto : pileDTOs) {
@@ -464,7 +556,18 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
if (response.getStatusCode() == HttpStatus.OK) {
// 解析响应
JSONObject responseBody = JSON.parseObject(response.getBody());
List<JcppSyncResult> results = responseBody.getList("results", JcppSyncResult.class);
// JCPP 响应格式:{ "success": true, "data": { "results": [...] } }
JSONObject data = responseBody.getJSONObject("data");
List<JcppSyncResult> results = null;
if (data != null) {
results = data.getList("results", JcppSyncResult.class);
} else {
log.warn("重试JCPP 充电桩同步响应中没有 data 字段");
results = new ArrayList<>();
}
return results != null ? results : new ArrayList<>();
} else {
log.error("JCPP 充电桩同步接口返回错误: {}", response.getStatusCode());
@@ -517,12 +620,34 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
HttpEntity<String> entity = new HttpEntity<>(requestBody.toJSONString(), headers);
// 发送请求
log.info("调用 JCPP 充电枪同步接口: {}, 数量: {}", url, gunDTOs.size());
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
// 打印响应状态和内容
log.info("JCPP 充电枪同步接口响应 - 状态码: {}, 响应体: {}",
response.getStatusCode(), response.getBody());
if (response.getStatusCode() == HttpStatus.OK) {
// 解析响应
JSONObject responseBody = JSON.parseObject(response.getBody());
List<JcppSyncResult> results = responseBody.getList("results", JcppSyncResult.class);
// JCPP 响应格式:{ "success": true, "data": { "results": [...] } }
// 需要从 data 中获取 results
JSONObject data = responseBody.getJSONObject("data");
List<JcppSyncResult> results = null;
if (data != null) {
results = data.getList("results", JcppSyncResult.class);
} else {
log.warn("JCPP 充电枪同步响应中没有 data 字段");
results = new ArrayList<>();
}
// 统计结果
long successCount = results != null ? results.stream().filter(JcppSyncResult::isSuccess).count() : 0;
long failCount = results != null ? results.size() - successCount : 0;
log.info("JCPP 充电枪同步结果 - 成功: {}, 失败: {}", successCount, failCount);
return results != null ? results : new ArrayList<>();
} else if (response.getStatusCode() == HttpStatus.UNAUTHORIZED) {
// token 过期,清除缓存并重试一次
@@ -531,7 +656,8 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
// 递归调用重试(只重试一次)
return retrySyncGunsToJcpp(gunDTOs);
} else {
log.error("JCPP 充电枪同步接口返回错误: {}", response.getStatusCode());
log.error("JCPP 充电枪同步接口返回错误 - 状态码: {}, 响应体: {}",
response.getStatusCode(), response.getBody());
// 返回失败结果
List<JcppSyncResult> results = new ArrayList<>();
for (JcppGunSyncDTO dto : gunDTOs) {
@@ -586,7 +712,18 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
if (response.getStatusCode() == HttpStatus.OK) {
// 解析响应
JSONObject responseBody = JSON.parseObject(response.getBody());
List<JcppSyncResult> results = responseBody.getList("results", JcppSyncResult.class);
// JCPP 响应格式:{ "success": true, "data": { "results": [...] } }
JSONObject data = responseBody.getJSONObject("data");
List<JcppSyncResult> results = null;
if (data != null) {
results = data.getList("results", JcppSyncResult.class);
} else {
log.warn("重试JCPP 充电枪同步响应中没有 data 字段");
results = new ArrayList<>();
}
return results != null ? results : new ArrayList<>();
} else {
log.error("JCPP 充电枪同步接口返回错误: {}", response.getStatusCode());
@@ -643,11 +780,39 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService {
record.setFailedGuns(response.getFailedGuns());
if (response.getErrors() != null && !response.getErrors().isEmpty()) {
record.setErrorMessage(String.join("; ", response.getErrors()));
// 限制错误信息长度,避免超过 TEXT 字段限制65535字节
// 只保存前100条错误信息并限制总长度不超过60000字符
List<String> errors = response.getErrors();
int maxErrors = Math.min(100, errors.size());
StringBuilder errorBuilder = new StringBuilder();
for (int i = 0; i < maxErrors; i++) {
String error = errors.get(i);
// 如果添加这条错误后会超过限制,则停止
if (errorBuilder.length() + error.length() + 2 > 60000) {
errorBuilder.append("... (还有 ").append(errors.size() - i).append(" 条错误信息被省略)");
break;
}
if (i > 0) {
errorBuilder.append("; ");
}
errorBuilder.append(error);
}
// 如果还有更多错误但没有超长,也添加提示
if (maxErrors < errors.size() && errorBuilder.length() < 60000) {
errorBuilder.append("; ... (还有 ").append(errors.size() - maxErrors).append(" 条错误信息被省略)");
}
record.setErrorMessage(errorBuilder.toString());
}
}
if (errorMessage != null) {
// 限制单个错误消息的长度
if (errorMessage.length() > 60000) {
errorMessage = errorMessage.substring(0, 60000) + "... (错误信息过长已截断)";
}
record.setErrorMessage(errorMessage);
}

View File

@@ -30,14 +30,32 @@ public class PileModelInfoServiceImpl implements PileModelInfoService {
private RedisCache redisCache;
/**
* 查询充电桩型号信息
* 查询充电桩型号信息(带缓存)
*
* @param id 充电桩型号信息主键
* @return 充电桩型号信息
*/
@Override
public PileModelInfo selectPileModelInfoById(Long id) {
return pileModelInfoMapper.selectPileModelInfoById(id);
if (id == null) {
return null;
}
// 1. 尝试从缓存获取
String redisKey = CacheConstants.PILE_MODEL_INFO_BY_ID + id;
PileModelInfo modelInfo = redisCache.getCacheObject(redisKey);
// 2. 缓存未命中,从数据库查询
if (modelInfo == null) {
modelInfo = pileModelInfoMapper.selectPileModelInfoById(id);
// 3. 查询结果存入缓存1天有效期
if (modelInfo != null) {
redisCache.setCacheObject(redisKey, modelInfo, CacheConstants.cache_expire_time_1d);
}
}
return modelInfo;
}
/**
@@ -135,10 +153,18 @@ public class PileModelInfoServiceImpl implements PileModelInfoService {
return modelInfoVO;
}
/**
* 清除缓存
*
* @param modelIds 型号ID数组
*/
private void cleanCache(Long[] modelIds) {
List<String> redisKeyList = Lists.newArrayList();
for (Long modelId : modelIds) {
// 清除 getPileModelInfoByModelId 的缓存
redisKeyList.add(CacheConstants.GET_PILE_MODEL_INFO_BY_MODEL_ID + modelId);
// 清除 selectPileModelInfoById 的缓存
redisKeyList.add(CacheConstants.PILE_MODEL_INFO_BY_ID + modelId);
}
redisCache.deleteObject(redisKeyList);
}