From 8f9cbc6cf89581ca86c3a514604a164b2c53f12b Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Tue, 6 Jan 2026 10:12:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A5=E5=85=85=E7=94=B5=E6=A1=A9?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=88=B0JCPP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-sit.yml | 12 +- .../common/constant/CacheConstants.java | 3 + .../jsowell/pile/jcpp/config/JcppConfig.java | 30 ++- .../pile/jcpp/dto/sync/JcppSyncResult.java | 4 + .../service/impl/JcppPileSyncServiceImpl.java | 205 ++++++++++++++++-- .../impl/PileModelInfoServiceImpl.java | 30 ++- 6 files changed, 258 insertions(+), 26 deletions(-) diff --git a/jsowell-admin/src/main/resources/application-sit.yml b/jsowell-admin/src/main/resources/application-sit.yml index 66b20a69a..d3c704add 100644 --- a/jsowell-admin/src/main/resources/application-sit.yml +++ b/jsowell-admin/src/main/resources/application-sit.yml @@ -5,6 +5,14 @@ jsowell: # JCPP 配置 jcpp: + # 连接超时时间(毫秒) + connect-timeout: 10000 + # 读取超时时间(毫秒) + timeout: 30000 + # 同步接口连接超时时间(毫秒) + sync-connect-timeout: 10000 + # 同步接口读取超时时间(毫秒)- 批量同步需要更长时间 + sync-timeout: 120000 rabbitmq: # 分区数量(与 JCPP 保持一致) partition-count: 10 @@ -15,8 +23,8 @@ jcpp: sync: # JCPP 同步接口地址 api-url: http://localhost:8180/api/sync - # 批量同步大小 - batch-size: 100 + # 批量同步大小(每批充电桩数量) + batch-size: 500 # 超时时间(毫秒) timeout: 60000 # 是否启用自动增量同步 diff --git a/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java b/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java index f7412bf96..44809f557 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java +++ b/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java @@ -84,6 +84,9 @@ public class CacheConstants { // 查询桩型号信息 public static final String GET_PILE_MODEL_INFO_BY_MODEL_ID = "get_pile_model_info_by_model_id:"; + // 根据ID查询桩型号信息 + public static final String PILE_MODEL_INFO_BY_ID = "pile_model_info_by_id:"; + // 地锁数据 public static final String GROUND_LOCK_DATA = "ground_lock_data:"; diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/config/JcppConfig.java b/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/config/JcppConfig.java index df777eccc..3202e065d 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/config/JcppConfig.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/config/JcppConfig.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.web.client.RestTemplate; @@ -22,7 +23,7 @@ public class JcppConfig { /** * JCPP 服务地址 */ - private String url = "http://localhost:8080"; + private String url = "http://localhost:8180"; /** * 下行接口路径 @@ -44,6 +45,16 @@ public class JcppConfig { */ private int connectTimeout = 3000; + /** + * 同步接口超时时间(毫秒)- 批量同步需要更长时间 + */ + private int syncTimeout = 120000; + + /** + * 同步接口连接超时时间(毫秒) + */ + private int syncConnectTimeout = 10000; + /** * 是否启用 JCPP 对接 */ @@ -74,8 +85,9 @@ public class JcppConfig { } /** - * 创建 JCPP 专用的 RestTemplate + * 创建 JCPP 专用的 RestTemplate(默认) */ + @Primary @Bean("jcppRestTemplate") public RestTemplate jcppRestTemplate() { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); @@ -86,4 +98,18 @@ public class JcppConfig { url, timeout, connectTimeout); return restTemplate; } + + /** + * 创建 JCPP 同步专用的 RestTemplate(超时时间更长) + */ + @Bean("jcppSyncRestTemplate") + public RestTemplate jcppSyncRestTemplate() { + SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); + factory.setConnectTimeout(syncConnectTimeout); + factory.setReadTimeout(syncTimeout); + RestTemplate restTemplate = new RestTemplate(factory); + log.info("JCPP 同步 RestTemplate 初始化完成, syncTimeout: {}ms, syncConnectTimeout: {}ms", + syncTimeout, syncConnectTimeout); + return restTemplate; + } } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/dto/sync/JcppSyncResult.java b/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/dto/sync/JcppSyncResult.java index 6675e102a..2eef0091f 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/dto/sync/JcppSyncResult.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/dto/sync/JcppSyncResult.java @@ -56,4 +56,8 @@ public class JcppSyncResult implements Serializable { result.setMessage(message); return result; } + + public Boolean isSuccess() { + return success; + } } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/service/impl/JcppPileSyncServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/service/impl/JcppPileSyncServiceImpl.java index f93ac7332..645a6340b 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/service/impl/JcppPileSyncServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/jcpp/service/impl/JcppPileSyncServiceImpl.java @@ -17,6 +17,7 @@ import com.jsowell.pile.service.PileConnectorInfoService; import com.jsowell.pile.service.PileModelInfoService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.*; import org.springframework.stereotype.Service; @@ -48,6 +49,7 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { private IJcppAuthService jcppAuthService; @Autowired + @Qualifier("jcppSyncRestTemplate") private RestTemplate restTemplate; @Autowired @@ -56,18 +58,18 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { @Value("${jcpp.sync.api-url:http://localhost:8180/api/sync}") private String jcppApiUrl; - @Value("${jcpp.sync.batch-size:100}") + @Value("${jcpp.sync.batch-size:500}") private int batchSize; @Value("${jcpp.sync.timeout:60000}") private int timeout; /** - * 全量同步充电桩数据到 JCPP + * 全量同步充电桩数据到 JCPP(批量处理) */ @Override public JcppSyncResponse syncAllPiles() { - log.info("开始全量同步充电桩数据到 JCPP"); + log.info("开始全量同步充电桩数据到 JCPP,批量大小: {}", batchSize); // 创建同步记录 JcppSyncRecord record = createSyncRecord("FULL"); @@ -78,7 +80,7 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { queryPile.setDelFlag("0"); List pileList = pileBasicInfoService.selectPileBasicInfoList(queryPile); - log.info("查询到 {} 个充电桩", pileList.size()); + log.info("查询到 {} 个充电桩,准备分批同步", pileList.size()); // 2. 查询所有充电枪(未删除的) PileConnectorInfo queryGun = new PileConnectorInfo(); @@ -87,19 +89,86 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { log.info("查询到 {} 个充电枪", gunList.size()); - // 3. 转换数据格式 - List pileDTOs = convertPilesToDTO(pileList); - List gunDTOs = convertGunsToDTO(gunList); + // 3. 按批次分割充电桩列表 + List> pileBatches = Lists.partition(pileList, batchSize); + int totalBatches = pileBatches.size(); + log.info("充电桩分为 {} 批,每批最多 {} 台", totalBatches, batchSize); - // 4. 调用 JCPP 同步接口 - JcppSyncResponse response = callJcppSyncApi(pileDTOs, gunDTOs); + // 4. 汇总所有批次的同步结果 + List allPileResults = new ArrayList<>(); + List allGunResults = new ArrayList<>(); - // 5. 更新同步记录 + // 5. 逐批处理充电桩 + for (int i = 0; i < pileBatches.size(); i++) { + List batchPiles = pileBatches.get(i); + int batchNo = i + 1; + + log.info("开始处理第 {}/{} 批充电桩,本批数量: {}", batchNo, totalBatches, batchPiles.size()); + + try { + // 5.1 转换充电桩数据 + List pileDTOs = convertPilesToDTO(batchPiles); + + // 5.2 先同步充电桩(充电枪依赖充电桩,必须先同步充电桩) + log.info("第 {} 批:开始同步充电桩...", batchNo); + List batchPileResults = syncPilesToJcpp(pileDTOs); + allPileResults.addAll(batchPileResults); + + // 统计充电桩同步结果 + long batchPileSuccess = batchPileResults.stream().filter(JcppSyncResult::isSuccess).count(); + long batchPileFailed = batchPiles.size() - batchPileSuccess; + log.info("第 {} 批:充电桩同步完成 {}/{} (成功/总数)", batchNo, batchPileSuccess, batchPiles.size()); + + // 5.3 查找本批充电桩对应的充电枪(只同步充电桩成功的枪) + List successPileSns = batchPileResults.stream() + .filter(JcppSyncResult::isSuccess) + .map(JcppSyncResult::getCode) + .collect(java.util.stream.Collectors.toList()); + + List batchGuns = gunList.stream() + .filter(gun -> successPileSns.contains(gun.getPileSn())) + .collect(java.util.stream.Collectors.toList()); + + log.info("第 {} 批:找到 {} 个充电枪(对应 {} 个成功的充电桩)", batchNo, batchGuns.size(), successPileSns.size()); + + // 5.4 同步充电枪(只同步充电桩成功的枪) + List batchGunResults = new ArrayList<>(); + if (!batchGuns.isEmpty()) { + List gunDTOs = convertGunsToDTO(batchGuns); + log.info("第 {} 批:开始同步充电枪...", batchNo); + batchGunResults = syncGunsToJcpp(gunDTOs); + allGunResults.addAll(batchGunResults); + + long batchGunSuccess = batchGunResults.stream().filter(JcppSyncResult::isSuccess).count(); + log.info("第 {} 批:充电枪同步完成 {}/{} (成功/总数)", batchNo, batchGunSuccess, batchGuns.size()); + } else { + log.warn("第 {} 批:没有需要同步的充电枪(充电桩全部失败)", batchNo); + } + + // 5.5 统计本批总体结果 + long batchGunSuccess = batchGunResults.stream().filter(JcppSyncResult::isSuccess).count(); + log.info("第 {}/{} 批同步完成: 充电桩 {}/{}, 充电枪 {}/{}", + batchNo, totalBatches, + batchPileSuccess, batchPiles.size(), + batchGunSuccess, batchGuns.size()); + + } catch (Exception e) { + log.error("第 {} 批同步失败", batchNo, e); + // 记录失败,但继续处理下一批 + for (PileBasicInfo pile : batchPiles) { + allPileResults.add(JcppSyncResult.fail(pile.getSn(), "批次同步异常: " + e.getMessage())); + } + } + } + + // 6. 构建最终响应 + JcppSyncResponse response = JcppSyncResponse.build(allPileResults, allGunResults); + + // 7. 更新同步记录 updateSyncRecord(record, response, "SUCCESS"); log.info("全量同步完成: 充电桩 {}/{}, 充电枪 {}/{}", response.getSuccessPiles(), response.getTotalPiles(), - response.getSuccessGuns(), response.getTotalGuns()); return response; @@ -302,8 +371,8 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { String gunName = gun.getName(); if (StringUtils.isEmpty(gunName)) { String gunNo = extractGunNo(gun.getPileConnectorCode()); - gunName = "充电枪" + gunNo; - log.warn("充电枪名称为空,使用默认名称: {} (gunCode: {})", gunName, gun.getPileConnectorCode()); + gunName = gun.getPileSn() + "的" + gunNo + "号枪"; + // log.warn("充电枪名称为空,使用默认名称: {} (gunCode: {})", gunName, gun.getPileConnectorCode()); } dto.setGunName(gunName); @@ -395,12 +464,34 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { HttpEntity entity = new HttpEntity<>(requestBody.toJSONString(), headers); // 发送请求 + log.info("调用 JCPP 充电桩同步接口: {}, 数量: {}", url, pileDTOs.size()); ResponseEntity response = restTemplate.postForEntity(url, entity, String.class); + // 打印响应状态和内容 + log.info("JCPP 充电桩同步接口响应 - 状态码: {}, 响应体: {}", + response.getStatusCode(), response.getBody()); + if (response.getStatusCode() == HttpStatus.OK) { // 解析响应 JSONObject responseBody = JSON.parseObject(response.getBody()); - List results = responseBody.getList("results", JcppSyncResult.class); + + // JCPP 响应格式:{ "success": true, "data": { "results": [...] } } + // 需要从 data 中获取 results + JSONObject data = responseBody.getJSONObject("data"); + List results = null; + + if (data != null) { + results = data.getList("results", JcppSyncResult.class); + } else { + log.warn("JCPP 充电桩同步响应中没有 data 字段"); + results = new ArrayList<>(); + } + + // 统计结果 + long successCount = results != null ? results.stream().filter(JcppSyncResult::isSuccess).count() : 0; + long failCount = results != null ? results.size() - successCount : 0; + log.info("JCPP 充电桩同步结果 - 成功: {}, 失败: {}", successCount, failCount); + return results != null ? results : new ArrayList<>(); } else if (response.getStatusCode() == HttpStatus.UNAUTHORIZED) { // token 过期,清除缓存并重试一次 @@ -409,7 +500,8 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { // 递归调用重试(只重试一次) return retrySyncPilesToJcpp(pileDTOs); } else { - log.error("JCPP 充电桩同步接口返回错误: {}", response.getStatusCode()); + log.error("JCPP 充电桩同步接口返回错误 - 状态码: {}, 响应体: {}", + response.getStatusCode(), response.getBody()); // 返回失败结果 List results = new ArrayList<>(); for (JcppPileSyncDTO dto : pileDTOs) { @@ -464,7 +556,18 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { if (response.getStatusCode() == HttpStatus.OK) { // 解析响应 JSONObject responseBody = JSON.parseObject(response.getBody()); - List results = responseBody.getList("results", JcppSyncResult.class); + + // JCPP 响应格式:{ "success": true, "data": { "results": [...] } } + JSONObject data = responseBody.getJSONObject("data"); + List results = null; + + if (data != null) { + results = data.getList("results", JcppSyncResult.class); + } else { + log.warn("重试:JCPP 充电桩同步响应中没有 data 字段"); + results = new ArrayList<>(); + } + return results != null ? results : new ArrayList<>(); } else { log.error("JCPP 充电桩同步接口返回错误: {}", response.getStatusCode()); @@ -517,12 +620,34 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { HttpEntity entity = new HttpEntity<>(requestBody.toJSONString(), headers); // 发送请求 + log.info("调用 JCPP 充电枪同步接口: {}, 数量: {}", url, gunDTOs.size()); ResponseEntity response = restTemplate.postForEntity(url, entity, String.class); + // 打印响应状态和内容 + log.info("JCPP 充电枪同步接口响应 - 状态码: {}, 响应体: {}", + response.getStatusCode(), response.getBody()); + if (response.getStatusCode() == HttpStatus.OK) { // 解析响应 JSONObject responseBody = JSON.parseObject(response.getBody()); - List results = responseBody.getList("results", JcppSyncResult.class); + + // JCPP 响应格式:{ "success": true, "data": { "results": [...] } } + // 需要从 data 中获取 results + JSONObject data = responseBody.getJSONObject("data"); + List results = null; + + if (data != null) { + results = data.getList("results", JcppSyncResult.class); + } else { + log.warn("JCPP 充电枪同步响应中没有 data 字段"); + results = new ArrayList<>(); + } + + // 统计结果 + long successCount = results != null ? results.stream().filter(JcppSyncResult::isSuccess).count() : 0; + long failCount = results != null ? results.size() - successCount : 0; + log.info("JCPP 充电枪同步结果 - 成功: {}, 失败: {}", successCount, failCount); + return results != null ? results : new ArrayList<>(); } else if (response.getStatusCode() == HttpStatus.UNAUTHORIZED) { // token 过期,清除缓存并重试一次 @@ -531,7 +656,8 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { // 递归调用重试(只重试一次) return retrySyncGunsToJcpp(gunDTOs); } else { - log.error("JCPP 充电枪同步接口返回错误: {}", response.getStatusCode()); + log.error("JCPP 充电枪同步接口返回错误 - 状态码: {}, 响应体: {}", + response.getStatusCode(), response.getBody()); // 返回失败结果 List results = new ArrayList<>(); for (JcppGunSyncDTO dto : gunDTOs) { @@ -586,7 +712,18 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { if (response.getStatusCode() == HttpStatus.OK) { // 解析响应 JSONObject responseBody = JSON.parseObject(response.getBody()); - List results = responseBody.getList("results", JcppSyncResult.class); + + // JCPP 响应格式:{ "success": true, "data": { "results": [...] } } + JSONObject data = responseBody.getJSONObject("data"); + List results = null; + + if (data != null) { + results = data.getList("results", JcppSyncResult.class); + } else { + log.warn("重试:JCPP 充电枪同步响应中没有 data 字段"); + results = new ArrayList<>(); + } + return results != null ? results : new ArrayList<>(); } else { log.error("JCPP 充电枪同步接口返回错误: {}", response.getStatusCode()); @@ -643,11 +780,39 @@ public class JcppPileSyncServiceImpl implements IJcppPileSyncService { record.setFailedGuns(response.getFailedGuns()); if (response.getErrors() != null && !response.getErrors().isEmpty()) { - record.setErrorMessage(String.join("; ", response.getErrors())); + // 限制错误信息长度,避免超过 TEXT 字段限制(65535字节) + // 只保存前100条错误信息,并限制总长度不超过60000字符 + List errors = response.getErrors(); + int maxErrors = Math.min(100, errors.size()); + StringBuilder errorBuilder = new StringBuilder(); + + for (int i = 0; i < maxErrors; i++) { + String error = errors.get(i); + // 如果添加这条错误后会超过限制,则停止 + if (errorBuilder.length() + error.length() + 2 > 60000) { + errorBuilder.append("... (还有 ").append(errors.size() - i).append(" 条错误信息被省略)"); + break; + } + if (i > 0) { + errorBuilder.append("; "); + } + errorBuilder.append(error); + } + + // 如果还有更多错误但没有超长,也添加提示 + if (maxErrors < errors.size() && errorBuilder.length() < 60000) { + errorBuilder.append("; ... (还有 ").append(errors.size() - maxErrors).append(" 条错误信息被省略)"); + } + + record.setErrorMessage(errorBuilder.toString()); } } if (errorMessage != null) { + // 限制单个错误消息的长度 + if (errorMessage.length() > 60000) { + errorMessage = errorMessage.substring(0, 60000) + "... (错误信息过长已截断)"; + } record.setErrorMessage(errorMessage); } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileModelInfoServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileModelInfoServiceImpl.java index 6940245e0..f16a217ff 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileModelInfoServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileModelInfoServiceImpl.java @@ -30,14 +30,32 @@ public class PileModelInfoServiceImpl implements PileModelInfoService { private RedisCache redisCache; /** - * 查询充电桩型号信息 + * 查询充电桩型号信息(带缓存) * * @param id 充电桩型号信息主键 * @return 充电桩型号信息 */ @Override public PileModelInfo selectPileModelInfoById(Long id) { - return pileModelInfoMapper.selectPileModelInfoById(id); + if (id == null) { + return null; + } + + // 1. 尝试从缓存获取 + String redisKey = CacheConstants.PILE_MODEL_INFO_BY_ID + id; + PileModelInfo modelInfo = redisCache.getCacheObject(redisKey); + + // 2. 缓存未命中,从数据库查询 + if (modelInfo == null) { + modelInfo = pileModelInfoMapper.selectPileModelInfoById(id); + + // 3. 查询结果存入缓存(1天有效期) + if (modelInfo != null) { + redisCache.setCacheObject(redisKey, modelInfo, CacheConstants.cache_expire_time_1d); + } + } + + return modelInfo; } /** @@ -135,10 +153,18 @@ public class PileModelInfoServiceImpl implements PileModelInfoService { return modelInfoVO; } + /** + * 清除缓存 + * + * @param modelIds 型号ID数组 + */ private void cleanCache(Long[] modelIds) { List redisKeyList = Lists.newArrayList(); for (Long modelId : modelIds) { + // 清除 getPileModelInfoByModelId 的缓存 redisKeyList.add(CacheConstants.GET_PILE_MODEL_INFO_BY_MODEL_ID + modelId); + // 清除 selectPileModelInfoById 的缓存 + redisKeyList.add(CacheConstants.PILE_MODEL_INFO_BY_ID + modelId); } redisCache.deleteObject(redisKeyList); }