From c10ac243234818000c169b5454c62ed033dbc944 Mon Sep 17 00:00:00 2001 From: Lemon Date: Sat, 21 Mar 2026 10:19:22 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=A1=A9=E7=AB=AF=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E5=86=85=E5=AE=B9=EF=BC=8C=E8=A7=A3=E5=86=B3?= =?UTF-8?q?=E9=A2=91=E7=B9=81=E7=99=BB=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- improve_record.md | 212 ++++++++++++++++++ .../common/enums/ykc/PileChannelEntity.java | 38 +++- .../framework/config/ThreadPoolConfig.java | 8 +- .../yunkuaichong/HeartbeatRequestHandler.java | 38 ++-- .../NettyServerChannelInitializer.java | 2 +- .../yunkuaichong/NettyServerHandler.java | 22 +- 6 files changed, 278 insertions(+), 42 deletions(-) create mode 100644 improve_record.md diff --git a/improve_record.md b/improve_record.md new file mode 100644 index 000000000..a48feefdb --- /dev/null +++ b/improve_record.md @@ -0,0 +1,212 @@ +# 充电桩频繁登录问题优化记录 + +## 问题背景 + +**发生时间**: 2026-03-21 06:00 左右 + +**问题描述**: 充电桩出现频繁登录的情况 + +**初步原因**: 项目没有及时回复桩端的心跳消息,未及时回复超过 3 次后桩端会发起重连 + +--- + +## 问题分析 + +### 问题 1: Pipeline 配置存在潜在阻塞点(最关键) + +**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java:41` + +**问题描述**: +- `nettyServerHandler` 绑定到 `businessGroup` 线程池(32 线程) +- 但 `echoServerHandler` **没有绑定到任何 EventExecutorGroup**,它会在 **IO 线程**(Worker 线程)中执行 +- 如果 IO 线程被其他任务阻塞,心跳回复就无法及时发送 + +**影响**: 这是导致心跳回复不及时的最主要原因! + +--- + +### 问题 2: 心跳处理中存在阻塞的数据库操作 + +**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/HeartbeatRequestHandler.java:54` + +**问题描述**: +- `saveLastTimeAndCheckChannel()` 是**同步**执行的,包含 Redis 写操作 +- 如果 Redis 响应慢,会阻塞心跳响应 + +--- + +### 问题 3: `PileChannelEntity.checkChannel()` 使用同步阻塞遍历 + +**文件**: `jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java:108-115` + +**问题描述**: +- `getPileSnByChannelId()` 使用线性遍历整个 `manager.entrySet()` +- 当连接数很多时,这个遍历操作会很慢,并且会阻塞执行线程 + +--- + +### 问题 4: 线程池配置可能导致任务堆积 + +**文件**: `jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java:63` + +**问题描述**: +- 使用 `CallerRunsPolicy` 拒绝策略,当队列满时,任务会在调用者线程中执行 +- 如果 Netty 的 IO 线程或 businessGroup 线程作为调用者,会**阻塞这些关键线程**! + +--- + +## 优化方案 + +### 优化 1: 修复 Pipeline 配置(优先级:P0) + +**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java:41` + +**修改内容**: +```java +// 修改前 +pipeline.addLast(businessGroup, nettyServerHandler); +pipeline.addLast(echoServerHandler); + +// 修改后 +pipeline.addLast(businessGroup, nettyServerHandler); +pipeline.addLast(businessGroup, echoServerHandler); // 回复Handler也绑定到业务线程池 +``` + +**效果**: 避免心跳回复被 IO 线程阻塞 + +--- + +### 优化 2: 优化 PileChannelEntity 使用双向映射(优先级:P0) + +**文件**: `jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java` + +**修改内容**: +1. 新增反向映射 `channelIdToPileSnMap` +2. `getPileSnByChannelId()` 从 O(n) 优化到 O(1) +3. 同步维护反向映射(checkChannel、removeByPileSn、removeByChannelId) + +**效果**: 当连接数多时,查询性能大幅提升 + +--- + +### 优化 3: 修改线程池拒绝策略(优先级:P1) + +**文件**: `jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java` + +**修改内容**: +```java +// 修改前 +executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); +private final int queueCapacity = 2000; + +// 修改后 +executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); +private final int queueCapacity = 5000; // 增大队列容量 +``` + +**效果**: 避免阻塞关键线程(Netty IO 线程、业务线程池) + +--- + +### 优化 4: 心跳处理完全异步化(优先级:P1) + +**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/HeartbeatRequestHandler.java:67-85` + +**修改内容**: +```java +// 修改前:saveLastTimeAndCheckChannel 同步执行 +saveLastTimeAndCheckChannel(pileSn, channel); +CompletableFuture.runAsync(() -> updateStatus(...), executor); + +// 修改后:先返回心跳应答,再异步处理 +byte[] response = getResult(ykcDataProtocol, messageBody); +CompletableFuture.runAsync(() -> { + saveLastTimeAndCheckChannel(pileSn, channel); + pileBasicInfoService.updateStatus(...); +}, executor); +return response; +``` + +**效果**: 心跳回复不受 Redis/数据库操作影响 + +--- + +### 优化 5: 增加心跳处理耗时监控(优先级:P2) + +**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java:174-180` + +**修改内容**: +```java +long elapsed = System.currentTimeMillis() - startTime; +int warnThreshold = "0x03".equals(frameTypeStr) ? 50 : 200; +if (elapsed > warnThreshold) { + log.error("【性能警告】消息处理耗时过长: {}ms, 帧类型: {}, pileSn: {}", elapsed, frameTypeStr, pileSn); +} +``` + +**效果**: 便于后续排查性能问题 + +--- + +## 修改文件清单 + +| 序号 | 文件路径 | 修改内容 | +|------|----------|----------| +| 1 | `jsowell-netty/.../NettyServerChannelInitializer.java` | EchoServerHandler 绑定到业务线程池 | +| 2 | `jsowell-common/.../PileChannelEntity.java` | 新增反向映射,优化查询性能 | +| 3 | `jsowell-framework/.../ThreadPoolConfig.java` | 修改拒绝策略,增大队列容量 | +| 4 | `jsowell-netty/.../HeartbeatRequestHandler.java` | 心跳处理完全异步化 | +| 5 | `jsowell-netty/.../NettyServerHandler.java` | 增加消息处理耗时监控 | + +--- + +## 测试建议 + +### 1. 编译打包 +```bash +mvn clean package -DskipTests +``` + +### 2. 部署后观察日志 +关注以下指标: +- 是否还有桩频繁登录的情况 +- 心跳处理耗时是否降低 +- 是否出现 `【性能警告】` 日志 + +### 3. 监控 Redis 和数据库性能 +- Redis 响应时间应 < 5ms +- 数据库连接池应充足 +- 检查 Redis 服务性能 + +### 4. 持续监控 +部署后持续观察 24 小时,确认问题是否彻底解决 + +--- + +## 预期效果 + +| 指标 | 优化前 | 优化后 | +|------|--------|--------| +| 心跳回复延迟 | 可能被阻塞 | < 10ms | +| 桩频繁登录 | 存在 | 消除 | +| 连接查询性能 | O(n) | O(1) | +| 线程阻塞风险 | 高 | 低 | + +--- + +## 相关文档 + +- `CLAUDE.md` - 项目架构文档 +- `jsowell-netty/CLAUDE.md` - Netty 模块文档 +- `jsowell-common/CLAUDE.md` - 通用模块文档 + +--- + +## 记录信息 + +| 项目 | 内容 | +|------|------| +| 记录人 | Claude Code | +| 记录时间 | 2026-03-21 | +| Git 分支 | dev | +| 提交建议 | 建议创建新分支提交,便于回滚 | diff --git a/jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java b/jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java index a2d1c942d..f3b65c75f 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java +++ b/jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java @@ -22,9 +22,17 @@ public class PileChannelEntity { /** * 管理一个全局map,保存连接进服务端的通道数量 + * key: pileSn, value: ChannelHandlerContext */ private static final ConcurrentHashMap manager = new ConcurrentHashMap<>(); + /** + * 反向映射:channelId -> pileSn + * 优化查询性能,避免线性遍历 manager.entrySet() + * 当连接数很多时,使用这个映射可以将 getPileSnByChannelId() 从 O(n) 优化到 O(1) + */ + private static final ConcurrentHashMap channelIdToPileSnMap = new ConcurrentHashMap<>(); + // 桩号--channelId 一对多 public static final ConcurrentHashMap> pileMap = new ConcurrentHashMap<>(); @@ -32,10 +40,14 @@ public class PileChannelEntity { * 校验channel是否保存 */ public static void checkChannel(String pileSn, ChannelHandlerContext ctx) { + String channelId = ctx.channel().id().asLongText(); + if (!manager.containsKey(pileSn)) { // 如果manager中不存在pileSn的连接,则保存 - log.info("checkChannel-manager中不存在pileSn:{}的连接,保存新的channel:{}", pileSn, ctx.channel().id().asLongText()); + log.info("checkChannel-manager中不存在pileSn:{}的连接,保存新的channel:{}", pileSn, channelId); manager.put(pileSn, ctx); + // 同步更新反向映射 + channelIdToPileSnMap.put(channelId, pileSn); return; } @@ -43,18 +55,22 @@ public class PileChannelEntity { Channel sourceChannel = manager.get(pileSn).channel(); if (sourceChannel == null) { // 为空就put - log.info("checkChannel-manager中pileSn:{}的连接为空,保存新的channel:{}", pileSn, ctx.channel().id().asLongText()); + log.info("checkChannel-manager中pileSn:{}的连接为空,保存新的channel:{}", pileSn, channelId); manager.put(pileSn, ctx); + // 同步更新反向映射 + channelIdToPileSnMap.put(channelId, pileSn); return; } // 两个做对比 String sourceChannelId = sourceChannel.id().asLongText(); - String channelId = ctx.channel().id().asLongText(); if (!StringUtils.equals(sourceChannelId, channelId)) { // 不一致则更新 log.info("checkChannel-manager中pileSn:{}的连接不一致, 老channelId:{}, 保存新的channel:{}", pileSn, sourceChannelId, channelId); manager.put(pileSn, ctx); + // 移除旧的反向映射,添加新的 + channelIdToPileSnMap.remove(sourceChannelId); + channelIdToPileSnMap.put(channelId, pileSn); } } @@ -102,16 +118,12 @@ public class PileChannelEntity { /** * 通过channelId获取桩编号 + * 优化:使用反向映射 channelIdToPileSnMap 实现 O(1) 查询 * @param channelId * @return */ public static String getPileSnByChannelId(String channelId) { - for (HashMap.Entry entry : manager.entrySet()) { - if (entry.getValue().channel().id().asLongText().equals(channelId)) { - return entry.getKey(); - } - } - return null; + return channelIdToPileSnMap.get(channelId); } // public static String getPileSnByCtx(ChannelHandlerContext ctx) { @@ -134,7 +146,11 @@ public class PileChannelEntity { if (StringUtils.isBlank(pileSn)) { return; } - manager.remove(pileSn); + ChannelHandlerContext ctx = manager.remove(pileSn); + // 同步删除反向映射 + if (ctx != null) { + channelIdToPileSnMap.remove(ctx.channel().id().asLongText()); + } } public static void removeByChannelId(String channelId){ @@ -144,6 +160,8 @@ public class PileChannelEntity { String pileSn = getPileSnByChannelId(channelId); if (StringUtils.isNotBlank(pileSn)) { manager.remove(pileSn); + // 同步删除反向映射 + channelIdToPileSnMap.remove(channelId); } } diff --git a/jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java b/jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java index ec59169f6..5b0d276df 100644 --- a/jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java +++ b/jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java @@ -27,8 +27,8 @@ public class ThreadPoolConfig { // 最大可创建的线程数 private final int maxPoolSize = 128; // 突发时翻倍,避免过多上下文切换 - // 队列最大长度 - private final int queueCapacity = 2000; // 桩消息短时堆积时提供缓冲,防止直接拒绝 + // 队列最大长度 (优化:增大队列容量以应对桩端消息高峰) + private final int queueCapacity = 5000; // 桩消息短时堆积时提供缓冲,防止直接拒绝 // 线程池维护线程所允许的空闲时间 private final int keepAliveSeconds = 120; // 突发结束后快速回收扩展线程 @@ -51,6 +51,7 @@ public class ThreadPoolConfig { /** * 线程池 + * 优化:使用 DiscardOldestPolicy 避免阻塞调用者线程(如 Netty IO 线程) */ @Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { @@ -60,7 +61,8 @@ public class ThreadPoolConfig { executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); // 线程池对拒绝任务(无线程可用)的处理策略 - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 优化:DiscardOldestPolicy 丢弃最老的任务,避免阻塞调用者线程 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); executor.setThreadNamePrefix(threadNamePrefix); executor.setWaitForTasksToCompleteOnShutdown(true); // log.info("threadPoolTaskExecutor创建成功"); diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/HeartbeatRequestHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/HeartbeatRequestHandler.java index 9dcb75941..f91c9b34d 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/HeartbeatRequestHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/HeartbeatRequestHandler.java @@ -19,6 +19,7 @@ import java.util.concurrent.CompletableFuture; /** * 充电桩心跳包 + * 优化:将所有异步操作完全异步化,先返回心跳应答再执行其他逻辑 */ @Slf4j @Component @@ -50,12 +51,6 @@ public class HeartbeatRequestHandler extends AbstractYkcHandler { byte[] pileSnByte = BytesUtil.copyBytes(msgBody, startIndex, length); String pileSn = BytesUtil.binary(pileSnByte, 16); - // 保存时间 - saveLastTimeAndCheckChannel(pileSn, channel); - - // 校验channel - // PileChannelEntity.checkChannel(pileSn, channel); - // 枪号 startIndex += length; length = 1; @@ -69,30 +64,27 @@ public class HeartbeatRequestHandler extends AbstractYkcHandler { String connectorStatus = BytesUtil.binary(connectorStatusByte, 16); // log.info("桩号:{}, 枪号:{}, 枪状态:{}", pileSn, pileConnectorNum, connectorStatus); - // updateStatus(pileSn, pileConnectorNum, connectorStatus); + // 优化:先构建并返回心跳应答,不阻塞心跳回复 + byte[] flag = Constants.zeroByteArray; + byte[] messageBody = Bytes.concat(pileSnByte, pileConnectorNumByte, flag); + byte[] response = getResult(ykcDataProtocol, messageBody); - // 公共方法修改状态 - // try { - // pileBasicInfoService.updateStatus(BytesUtil.bcd2Str(ykcDataProtocol.getFrameType()), pileSn, pileConnectorNum, connectorStatus, null); - // } catch (Exception e) { - // log.error("公共方法修改状态error", e); - // } - - // 异步修改状态 + // 优化:所有其他操作(Redis、数据库)完全异步化,不阻塞心跳回复 + // 将 saveLastTimeAndCheckChannel 和 updateStatus 都改为异步执行 CompletableFuture.runAsync(() -> { try { - pileBasicInfoService.updateStatus(BytesUtil.bcd2Str(ykcDataProtocol.getFrameType()), pileSn, pileConnectorNum, connectorStatus, null); + // 异步保存连接时间并检查通道(Redis 操作) + saveLastTimeAndCheckChannel(pileSn, channel); + + // 异步更新状态(数据库操作) + String frameType = BytesUtil.bcd2Str(ykcDataProtocol.getFrameType()); + pileBasicInfoService.updateStatus(frameType, pileSn, pileConnectorNum, connectorStatus, null); } catch (Exception e) { - log.error("公共方法修改状态error", e); + log.error("心跳异步处理error, pileSn:{}", pileSn, e); } }, executor); - // 心跳应答(置0) - byte[] flag = Constants.zeroByteArray; - - // 消息体 - byte[] messageBody = Bytes.concat(pileSnByte, pileConnectorNumByte, flag); - return getResult(ykcDataProtocol, messageBody); + return response; } } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java index 5505c3ac9..1e09877a0 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java @@ -38,7 +38,7 @@ public class NettyServerChannelInitializer extends ChannelInitializer>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), - frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + frameTypeStr, YKCFrameTypeCode.getFrameTypeStr(frameTypeStr), serialNumber, BytesUtil.binary(response, 16)); } } } finally { ReferenceCountUtil.release(message); + + // 优化:增加消息处理耗时监控,用于排查性能问题 + long elapsed = System.currentTimeMillis() - startTime; + // 心跳帧(0x03)处理超过50ms警告,其他帧超过200ms警告 + int warnThreshold = "0x03".equals(frameTypeStr) ? 50 : 200; + if (elapsed > warnThreshold) { + log.error("【性能警告】消息处理耗时过长: {}ms, 帧类型: {}, pileSn: {}", elapsed, frameTypeStr, pileSn); + } } }