update netty引入业务线程池

This commit is contained in:
Lemon
2025-07-22 14:23:50 +08:00
parent 57487e5892
commit 0e7cd0b33c
4 changed files with 43 additions and 8 deletions

View File

@@ -16,6 +16,8 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

View File

@@ -0,0 +1,31 @@
package com.jsowell.netty.server.yunkuaichong;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 回复消息Handler
*
* @author Lemon
* @Date 2025/7/22 14:00:03
*/
@ChannelHandler.Sharable
@Slf4j
@Component
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == null || msg == "") {
log.info("服务端响应空的消息");
return;
}
//将客户端的信息直接返回写入ctx
ctx.write(msg);
//刷新缓存区
ctx.flush();
}
}

View File

@@ -6,6 +6,8 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -14,8 +16,8 @@ import java.util.concurrent.TimeUnit;
@Component
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Resource
NettyServerHandler nettyServerHandler;
// 引入业务线程池
final EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(16);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
@@ -28,7 +30,9 @@ public class NettyServerChannelInitializer extends ChannelInitializer<SocketChan
pipeline.addLast("encoder", new ByteArrayDecoder());
// 读超时时间设置为30s0表示不监控
pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("handler", nettyServerHandler);
// pipeline.addLast("handler", nettyServerHandler);
pipeline.addLast(businessGroup, new NettyServerHandler()); // 消息先进入业务线程池
pipeline.addLast(new EchoServerHandler());
}
}

View File

@@ -148,11 +148,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (Objects.nonNull(response)) {
// 响应客户端
ByteBuf buffer = ctx.alloc().buffer().writeBytes(response);
this.channelWrite(channel.id(), buffer);
// 获取桩号
// String pileSn = PileChannelEntity.getPileSnByChannelId(ctx.channel().id().asLongText());
// 批量响应客户端
// this.channelWriteBatch(pileSn, buffer);
// this.channelWrite(channel.id(), buffer);
super.channelRead(ctx, buffer);
if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) {
// 应答帧类型
byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes);