update 桩端连接相关内容,解决频繁登录

This commit is contained in:
Lemon
2026-03-21 10:19:22 +08:00
parent c9d7f505b6
commit c10ac24323
6 changed files with 278 additions and 42 deletions

212
improve_record.md Normal file
View File

@@ -0,0 +1,212 @@
# 充电桩频繁登录问题优化记录
## 问题背景
**发生时间**: 2026-03-21 06:00 左右
**问题描述**: 充电桩出现频繁登录的情况
**初步原因**: 项目没有及时回复桩端的心跳消息,未及时回复超过 3 次后桩端会发起重连
---
## 问题分析
### 问题 1: Pipeline 配置存在潜在阻塞点(最关键)
**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java:41`
**问题描述**:
- `nettyServerHandler` 绑定到 `businessGroup` 线程池32 线程)
-`echoServerHandler` **没有绑定到任何 EventExecutorGroup**,它会在 **IO 线程**Worker 线程)中执行
- 如果 IO 线程被其他任务阻塞,心跳回复就无法及时发送
**影响**: 这是导致心跳回复不及时的最主要原因!
---
### 问题 2: 心跳处理中存在阻塞的数据库操作
**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/HeartbeatRequestHandler.java:54`
**问题描述**:
- `saveLastTimeAndCheckChannel()` 是**同步**执行的,包含 Redis 写操作
- 如果 Redis 响应慢,会阻塞心跳响应
---
### 问题 3: `PileChannelEntity.checkChannel()` 使用同步阻塞遍历
**文件**: `jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java:108-115`
**问题描述**:
- `getPileSnByChannelId()` 使用线性遍历整个 `manager.entrySet()`
- 当连接数很多时,这个遍历操作会很慢,并且会阻塞执行线程
---
### 问题 4: 线程池配置可能导致任务堆积
**文件**: `jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java:63`
**问题描述**:
- 使用 `CallerRunsPolicy` 拒绝策略,当队列满时,任务会在调用者线程中执行
- 如果 Netty 的 IO 线程或 businessGroup 线程作为调用者,会**阻塞这些关键线程**
---
## 优化方案
### 优化 1: 修复 Pipeline 配置优先级P0
**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java:41`
**修改内容**:
```java
// 修改前
pipeline.addLast(businessGroup, nettyServerHandler);
pipeline.addLast(echoServerHandler);
// 修改后
pipeline.addLast(businessGroup, nettyServerHandler);
pipeline.addLast(businessGroup, echoServerHandler); // 回复Handler也绑定到业务线程池
```
**效果**: 避免心跳回复被 IO 线程阻塞
---
### 优化 2: 优化 PileChannelEntity 使用双向映射优先级P0
**文件**: `jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java`
**修改内容**:
1. 新增反向映射 `channelIdToPileSnMap`
2. `getPileSnByChannelId()` 从 O(n) 优化到 O(1)
3. 同步维护反向映射checkChannel、removeByPileSn、removeByChannelId
**效果**: 当连接数多时,查询性能大幅提升
---
### 优化 3: 修改线程池拒绝策略优先级P1
**文件**: `jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java`
**修改内容**:
```java
// 修改前
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
private final int queueCapacity = 2000;
// 修改后
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
private final int queueCapacity = 5000; // 增大队列容量
```
**效果**: 避免阻塞关键线程Netty IO 线程、业务线程池)
---
### 优化 4: 心跳处理完全异步化优先级P1
**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/HeartbeatRequestHandler.java:67-85`
**修改内容**:
```java
// 修改前saveLastTimeAndCheckChannel 同步执行
saveLastTimeAndCheckChannel(pileSn, channel);
CompletableFuture.runAsync(() -> updateStatus(...), executor);
// 修改后:先返回心跳应答,再异步处理
byte[] response = getResult(ykcDataProtocol, messageBody);
CompletableFuture.runAsync(() -> {
saveLastTimeAndCheckChannel(pileSn, channel);
pileBasicInfoService.updateStatus(...);
}, executor);
return response;
```
**效果**: 心跳回复不受 Redis/数据库操作影响
---
### 优化 5: 增加心跳处理耗时监控优先级P2
**文件**: `jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java:174-180`
**修改内容**:
```java
long elapsed = System.currentTimeMillis() - startTime;
int warnThreshold = "0x03".equals(frameTypeStr) ? 50 : 200;
if (elapsed > warnThreshold) {
log.error("【性能警告】消息处理耗时过长: {}ms, 帧类型: {}, pileSn: {}", elapsed, frameTypeStr, pileSn);
}
```
**效果**: 便于后续排查性能问题
---
## 修改文件清单
| 序号 | 文件路径 | 修改内容 |
|------|----------|----------|
| 1 | `jsowell-netty/.../NettyServerChannelInitializer.java` | EchoServerHandler 绑定到业务线程池 |
| 2 | `jsowell-common/.../PileChannelEntity.java` | 新增反向映射,优化查询性能 |
| 3 | `jsowell-framework/.../ThreadPoolConfig.java` | 修改拒绝策略,增大队列容量 |
| 4 | `jsowell-netty/.../HeartbeatRequestHandler.java` | 心跳处理完全异步化 |
| 5 | `jsowell-netty/.../NettyServerHandler.java` | 增加消息处理耗时监控 |
---
## 测试建议
### 1. 编译打包
```bash
mvn clean package -DskipTests
```
### 2. 部署后观察日志
关注以下指标:
- 是否还有桩频繁登录的情况
- 心跳处理耗时是否降低
- 是否出现 `【性能警告】` 日志
### 3. 监控 Redis 和数据库性能
- Redis 响应时间应 < 5ms
- 数据库连接池应充足
- 检查 Redis 服务性能
### 4. 持续监控
部署后持续观察 24 小时,确认问题是否彻底解决
---
## 预期效果
| 指标 | 优化前 | 优化后 |
|------|--------|--------|
| 心跳回复延迟 | 可能被阻塞 | < 10ms |
| 桩频繁登录 | 存在 | 消除 |
| 连接查询性能 | O(n) | O(1) |
| 线程阻塞风险 | 高 | 低 |
---
## 相关文档
- `CLAUDE.md` - 项目架构文档
- `jsowell-netty/CLAUDE.md` - Netty 模块文档
- `jsowell-common/CLAUDE.md` - 通用模块文档
---
## 记录信息
| 项目 | 内容 |
|------|------|
| 记录人 | Claude Code |
| 记录时间 | 2026-03-21 |
| Git 分支 | dev |
| 提交建议 | 建议创建新分支提交,便于回滚 |

View File

@@ -22,9 +22,17 @@ public class PileChannelEntity {
/** /**
* 管理一个全局map保存连接进服务端的通道数量 * 管理一个全局map保存连接进服务端的通道数量
* key: pileSn, value: ChannelHandlerContext
*/ */
private static final ConcurrentHashMap<String, ChannelHandlerContext> manager = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, ChannelHandlerContext> manager = new ConcurrentHashMap<>();
/**
* 反向映射channelId -> pileSn
* 优化查询性能,避免线性遍历 manager.entrySet()
* 当连接数很多时,使用这个映射可以将 getPileSnByChannelId() 从 O(n) 优化到 O(1)
*/
private static final ConcurrentHashMap<String, String> channelIdToPileSnMap = new ConcurrentHashMap<>();
// 桩号--channelId 一对多 // 桩号--channelId 一对多
public static final ConcurrentHashMap<String, List<ChannelHandlerContext>> pileMap = new ConcurrentHashMap<>(); public static final ConcurrentHashMap<String, List<ChannelHandlerContext>> pileMap = new ConcurrentHashMap<>();
@@ -32,10 +40,14 @@ public class PileChannelEntity {
* 校验channel是否保存 * 校验channel是否保存
*/ */
public static void checkChannel(String pileSn, ChannelHandlerContext ctx) { public static void checkChannel(String pileSn, ChannelHandlerContext ctx) {
String channelId = ctx.channel().id().asLongText();
if (!manager.containsKey(pileSn)) { if (!manager.containsKey(pileSn)) {
// 如果manager中不存在pileSn的连接则保存 // 如果manager中不存在pileSn的连接则保存
log.info("checkChannel-manager中不存在pileSn:{}的连接,保存新的channel:{}", pileSn, ctx.channel().id().asLongText()); log.info("checkChannel-manager中不存在pileSn:{}的连接,保存新的channel:{}", pileSn, channelId);
manager.put(pileSn, ctx); manager.put(pileSn, ctx);
// 同步更新反向映射
channelIdToPileSnMap.put(channelId, pileSn);
return; return;
} }
@@ -43,18 +55,22 @@ public class PileChannelEntity {
Channel sourceChannel = manager.get(pileSn).channel(); Channel sourceChannel = manager.get(pileSn).channel();
if (sourceChannel == null) { if (sourceChannel == null) {
// 为空就put // 为空就put
log.info("checkChannel-manager中pileSn:{}的连接为空,保存新的channel:{}", pileSn, ctx.channel().id().asLongText()); log.info("checkChannel-manager中pileSn:{}的连接为空,保存新的channel:{}", pileSn, channelId);
manager.put(pileSn, ctx); manager.put(pileSn, ctx);
// 同步更新反向映射
channelIdToPileSnMap.put(channelId, pileSn);
return; return;
} }
// 两个做对比 // 两个做对比
String sourceChannelId = sourceChannel.id().asLongText(); String sourceChannelId = sourceChannel.id().asLongText();
String channelId = ctx.channel().id().asLongText();
if (!StringUtils.equals(sourceChannelId, channelId)) { if (!StringUtils.equals(sourceChannelId, channelId)) {
// 不一致则更新 // 不一致则更新
log.info("checkChannel-manager中pileSn:{}的连接不一致, 老channelId:{}, 保存新的channel:{}", pileSn, sourceChannelId, channelId); log.info("checkChannel-manager中pileSn:{}的连接不一致, 老channelId:{}, 保存新的channel:{}", pileSn, sourceChannelId, channelId);
manager.put(pileSn, ctx); manager.put(pileSn, ctx);
// 移除旧的反向映射,添加新的
channelIdToPileSnMap.remove(sourceChannelId);
channelIdToPileSnMap.put(channelId, pileSn);
} }
} }
@@ -102,16 +118,12 @@ public class PileChannelEntity {
/** /**
* 通过channelId获取桩编号 * 通过channelId获取桩编号
* 优化:使用反向映射 channelIdToPileSnMap 实现 O(1) 查询
* @param channelId * @param channelId
* @return * @return
*/ */
public static String getPileSnByChannelId(String channelId) { public static String getPileSnByChannelId(String channelId) {
for (HashMap.Entry<String, ChannelHandlerContext> entry : manager.entrySet()) { return channelIdToPileSnMap.get(channelId);
if (entry.getValue().channel().id().asLongText().equals(channelId)) {
return entry.getKey();
}
}
return null;
} }
// public static String getPileSnByCtx(ChannelHandlerContext ctx) { // public static String getPileSnByCtx(ChannelHandlerContext ctx) {
@@ -134,7 +146,11 @@ public class PileChannelEntity {
if (StringUtils.isBlank(pileSn)) { if (StringUtils.isBlank(pileSn)) {
return; return;
} }
manager.remove(pileSn); ChannelHandlerContext ctx = manager.remove(pileSn);
// 同步删除反向映射
if (ctx != null) {
channelIdToPileSnMap.remove(ctx.channel().id().asLongText());
}
} }
public static void removeByChannelId(String channelId){ public static void removeByChannelId(String channelId){
@@ -144,6 +160,8 @@ public class PileChannelEntity {
String pileSn = getPileSnByChannelId(channelId); String pileSn = getPileSnByChannelId(channelId);
if (StringUtils.isNotBlank(pileSn)) { if (StringUtils.isNotBlank(pileSn)) {
manager.remove(pileSn); manager.remove(pileSn);
// 同步删除反向映射
channelIdToPileSnMap.remove(channelId);
} }
} }

View File

@@ -27,8 +27,8 @@ public class ThreadPoolConfig {
// 最大可创建的线程数 // 最大可创建的线程数
private final int maxPoolSize = 128; // 突发时翻倍,避免过多上下文切换 private final int maxPoolSize = 128; // 突发时翻倍,避免过多上下文切换
// 队列最大长度 // 队列最大长度 (优化:增大队列容量以应对桩端消息高峰)
private final int queueCapacity = 2000; // 桩消息短时堆积时提供缓冲,防止直接拒绝 private final int queueCapacity = 5000; // 桩消息短时堆积时提供缓冲,防止直接拒绝
// 线程池维护线程所允许的空闲时间 // 线程池维护线程所允许的空闲时间
private final int keepAliveSeconds = 120; // 突发结束后快速回收扩展线程 private final int keepAliveSeconds = 120; // 突发结束后快速回收扩展线程
@@ -51,6 +51,7 @@ public class ThreadPoolConfig {
/** /**
* 线程池 * 线程池
* 优化:使用 DiscardOldestPolicy 避免阻塞调用者线程(如 Netty IO 线程)
*/ */
@Bean(name = "threadPoolTaskExecutor") @Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() { public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
@@ -60,7 +61,8 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(queueCapacity); executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds); executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程池对拒绝任务(无线程可用)的处理策略 // 线程池对拒绝任务(无线程可用)的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 优化DiscardOldestPolicy 丢弃最老的任务,避免阻塞调用者线程
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
executor.setThreadNamePrefix(threadNamePrefix); executor.setThreadNamePrefix(threadNamePrefix);
executor.setWaitForTasksToCompleteOnShutdown(true); executor.setWaitForTasksToCompleteOnShutdown(true);
// log.info("threadPoolTaskExecutor创建成功"); // log.info("threadPoolTaskExecutor创建成功");

View File

@@ -19,6 +19,7 @@ import java.util.concurrent.CompletableFuture;
/** /**
* 充电桩心跳包 * 充电桩心跳包
* 优化:将所有异步操作完全异步化,先返回心跳应答再执行其他逻辑
*/ */
@Slf4j @Slf4j
@Component @Component
@@ -50,12 +51,6 @@ public class HeartbeatRequestHandler extends AbstractYkcHandler {
byte[] pileSnByte = BytesUtil.copyBytes(msgBody, startIndex, length); byte[] pileSnByte = BytesUtil.copyBytes(msgBody, startIndex, length);
String pileSn = BytesUtil.binary(pileSnByte, 16); String pileSn = BytesUtil.binary(pileSnByte, 16);
// 保存时间
saveLastTimeAndCheckChannel(pileSn, channel);
// 校验channel
// PileChannelEntity.checkChannel(pileSn, channel);
// 枪号 // 枪号
startIndex += length; startIndex += length;
length = 1; length = 1;
@@ -69,30 +64,27 @@ public class HeartbeatRequestHandler extends AbstractYkcHandler {
String connectorStatus = BytesUtil.binary(connectorStatusByte, 16); String connectorStatus = BytesUtil.binary(connectorStatusByte, 16);
// log.info("桩号:{}, 枪号:{}, 枪状态:{}", pileSn, pileConnectorNum, connectorStatus); // log.info("桩号:{}, 枪号:{}, 枪状态:{}", pileSn, pileConnectorNum, connectorStatus);
// updateStatus(pileSn, pileConnectorNum, connectorStatus); // 优化:先构建并返回心跳应答,不阻塞心跳回复
byte[] flag = Constants.zeroByteArray;
byte[] messageBody = Bytes.concat(pileSnByte, pileConnectorNumByte, flag);
byte[] response = getResult(ykcDataProtocol, messageBody);
// 公共方法修改状态 // 优化所有其他操作Redis、数据库完全异步化不阻塞心跳回复
// try { // 将 saveLastTimeAndCheckChannel 和 updateStatus 都改为异步执行
// pileBasicInfoService.updateStatus(BytesUtil.bcd2Str(ykcDataProtocol.getFrameType()), pileSn, pileConnectorNum, connectorStatus, null);
// } catch (Exception e) {
// log.error("公共方法修改状态error", e);
// }
// 异步修改状态
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try { try {
pileBasicInfoService.updateStatus(BytesUtil.bcd2Str(ykcDataProtocol.getFrameType()), pileSn, pileConnectorNum, connectorStatus, null); // 异步保存连接时间并检查通道Redis 操作)
saveLastTimeAndCheckChannel(pileSn, channel);
// 异步更新状态(数据库操作)
String frameType = BytesUtil.bcd2Str(ykcDataProtocol.getFrameType());
pileBasicInfoService.updateStatus(frameType, pileSn, pileConnectorNum, connectorStatus, null);
} catch (Exception e) { } catch (Exception e) {
log.error("公共方法修改状态error", e); log.error("心跳异步处理error, pileSn:{}", pileSn, e);
} }
}, executor); }, executor);
// 心跳应答置0 return response;
byte[] flag = Constants.zeroByteArray;
// 消息体
byte[] messageBody = Bytes.concat(pileSnByte, pileConnectorNumByte, flag);
return getResult(ykcDataProtocol, messageBody);
} }
} }

View File

@@ -38,7 +38,7 @@ public class NettyServerChannelInitializer extends ChannelInitializer<SocketChan
pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
// pipeline.addLast("handler", nettyServerHandler); // pipeline.addLast("handler", nettyServerHandler);
pipeline.addLast(businessGroup, nettyServerHandler); // 消息先进入业务线程池 pipeline.addLast(businessGroup, nettyServerHandler); // 消息先进入业务线程池
pipeline.addLast(echoServerHandler); pipeline.addLast(businessGroup, echoServerHandler); // 回复Handler也绑定到业务线程池避免阻塞IO线程
} }
} }

View File

@@ -132,19 +132,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
long startTime = System.currentTimeMillis();
String pileSn = "";
String frameTypeStr = "";
try { try {
YKCDataProtocol ykcDataProtocol = (YKCDataProtocol) message; YKCDataProtocol ykcDataProtocol = (YKCDataProtocol) message;
// 获取帧类型 // 获取帧类型
byte[] frameTypeBytes = ykcDataProtocol.getFrameType(); byte[] frameTypeBytes = ykcDataProtocol.getFrameType();
String frameType = YKCUtils.frameType2Str(frameTypeBytes); frameTypeStr = YKCUtils.frameType2Str(frameTypeBytes);
// 获取序列号域 // 获取序列号域
int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber()); int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber());
// 获取channel // 获取channel
Channel channel = ctx.channel(); Channel channel = ctx.channel();
// 心跳包0x03日志太多造成日志文件过大改为不打印 // 心跳包0x03日志太多造成日志文件过大改为不打印
if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameTypeStr)) {
log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}",
channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, channel.id(), frameTypeStr, YKCFrameTypeCode.getFrameTypeStr(frameTypeStr), serialNumber,
BytesUtil.binary(ykcDataProtocol.getBytes(), 16)); BytesUtil.binary(ykcDataProtocol.getBytes(), 16));
} }
// 处理数据 // 处理数据
@@ -154,18 +158,26 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); ByteBuf buffer = ctx.alloc().buffer().writeBytes(response);
// this.channelWrite(channel.id(), buffer); // this.channelWrite(channel.id(), buffer);
super.channelRead(ctx, buffer); super.channelRead(ctx, buffer);
if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameTypeStr)) {
// 应答帧类型 // 应答帧类型
byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes); byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes);
String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes); String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes);
log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}",
channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType),
frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, frameTypeStr, YKCFrameTypeCode.getFrameTypeStr(frameTypeStr), serialNumber,
BytesUtil.binary(response, 16)); BytesUtil.binary(response, 16));
} }
} }
} finally { } finally {
ReferenceCountUtil.release(message); ReferenceCountUtil.release(message);
// 优化:增加消息处理耗时监控,用于排查性能问题
long elapsed = System.currentTimeMillis() - startTime;
// 心跳帧(0x03)处理超过50ms警告其他帧超过200ms警告
int warnThreshold = "0x03".equals(frameTypeStr) ? 50 : 200;
if (elapsed > warnThreshold) {
log.error("【性能警告】消息处理耗时过长: {}ms, 帧类型: {}, pileSn: {}", elapsed, frameTypeStr, pileSn);
}
} }
} }