ChannelHandlerParameter 上浮

This commit is contained in:
三丙
2024-10-10 11:44:27 +08:00
parent 02c399d8dd
commit 278bb9e69e
4 changed files with 22 additions and 24 deletions

View File

@@ -19,6 +19,8 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.protocol.cfg.TcpCfg;
import sanbing.jcpp.protocol.cfg.TcpHandlerCfg;
import sanbing.jcpp.protocol.cfg.enums.TcpHandlerType; import sanbing.jcpp.protocol.cfg.enums.TcpHandlerType;
import sanbing.jcpp.protocol.listener.tcp.TcpChannelHandler; import sanbing.jcpp.protocol.listener.tcp.TcpChannelHandler;
import sanbing.jcpp.protocol.listener.tcp.configs.BinaryHandlerConfiguration; import sanbing.jcpp.protocol.listener.tcp.configs.BinaryHandlerConfiguration;
@@ -59,22 +61,23 @@ public abstract class ChannelHandlerInitializer<C extends Channel> extends Chann
super.channelUnregistered(ctx); super.channelUnregistered(ctx);
} }
public static ChannelHandlerInitializer<SocketChannel> createTcpChannelHandler(ChannelHandlerParameter parameter) { public static ChannelHandlerInitializer<SocketChannel> createTcpChannelHandler(TcpCfg tcpCfg, ChannelHandlerParameter parameter) {
TcpHandlerType type = parameter.handlerCfg().getType(); TcpHandlerCfg tcpCfgHandler = tcpCfg.getHandler();
TcpHandlerType type = tcpCfgHandler.getType();
return switch (type) { return switch (type) {
case TEXT -> new ChannelHandlerInitializer<>() { case TEXT -> new ChannelHandlerInitializer<>() {
@Override @Override
protected void initChannel(SocketChannel socketChannel) { protected void initChannel(SocketChannel socketChannel) {
TextHandlerConfiguration textHandlerConfig = (TextHandlerConfiguration) parameter.handlerCfg().getConfiguration(TEXT); TextHandlerConfiguration textHandlerConfig = (TextHandlerConfiguration) tcpCfgHandler.getConfiguration(TEXT);
ByteBuf[] delimiters = SYSTEM_LINE_SEPARATOR.equals(textHandlerConfig.getMessageSeparator()) ByteBuf[] delimiters = SYSTEM_LINE_SEPARATOR.equals(textHandlerConfig.getMessageSeparator())
? Delimiters.lineDelimiter() : Delimiters.nulDelimiter(); ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter();
DelimiterBasedFrameDecoder framer = new DelimiterBasedFrameDecoder(textHandlerConfig.getMaxFrameLength(), DelimiterBasedFrameDecoder framer = new DelimiterBasedFrameDecoder(textHandlerConfig.getMaxFrameLength(),
textHandlerConfig.isStripDelimiter(), delimiters); textHandlerConfig.isStripDelimiter(), delimiters);
socketChannel.pipeline() socketChannel.pipeline()
.addLast("tracerHandler", new TracerHandler()) .addLast("tracerHandler", new TracerHandler())
.addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), parameter.handlerCfg().getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), tcpCfgHandler.getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge()))
.addLast("idleStateHandler", new IdleStateHandler(parameter.handlerCfg().getIdleTimeoutSeconds(), 0, 0)) .addLast("idleStateHandler", new IdleStateHandler(tcpCfgHandler.getIdleTimeoutSeconds(), 0, 0))
.addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName())) .addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName()))
.addLast("framer", framer) .addLast("framer", framer)
.addLast("tcpTextDecoder", new TcpMsgDecoder<>(parameter.protocolName(), msg -> TcpMsgDecoder.toString(msg, textHandlerConfig.getCharsetName()))) .addLast("tcpTextDecoder", new TcpMsgDecoder<>(parameter.protocolName(), msg -> TcpMsgDecoder.toString(msg, textHandlerConfig.getCharsetName())))
@@ -87,9 +90,9 @@ public abstract class ChannelHandlerInitializer<C extends Channel> extends Chann
protected void initChannel(SocketChannel socketChannel) { protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline() socketChannel.pipeline()
.addLast("tracerHandler", new TracerHandler()) .addLast("tracerHandler", new TracerHandler())
.addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), parameter.handlerCfg().getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), tcpCfgHandler.getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge()))
.addLast("idleStateHandler", .addLast("idleStateHandler",
new IdleStateHandler(parameter.handlerCfg().getIdleTimeoutSeconds(), 0, 0)) new IdleStateHandler(tcpCfgHandler.getIdleTimeoutSeconds(), 0, 0))
.addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName())) .addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName()))
.addLast("datagramToJsonDecoder", new JsonObjectDecoder()) .addLast("datagramToJsonDecoder", new JsonObjectDecoder())
.addLast("tcpJsonDecoder", new TcpMsgDecoder<>(parameter.protocolName(), TcpMsgDecoder::toJson)) .addLast("tcpJsonDecoder", new TcpMsgDecoder<>(parameter.protocolName(), TcpMsgDecoder::toJson))
@@ -99,15 +102,15 @@ public abstract class ChannelHandlerInitializer<C extends Channel> extends Chann
case BINARY -> new ChannelHandlerInitializer<>() { case BINARY -> new ChannelHandlerInitializer<>() {
@Override @Override
protected void initChannel(SocketChannel socketChannel) { protected void initChannel(SocketChannel socketChannel) {
BinaryHandlerConfiguration binaryHandlerConfig = (BinaryHandlerConfiguration) parameter.handlerCfg().getConfiguration(BINARY); BinaryHandlerConfiguration binaryHandlerConfig = (BinaryHandlerConfiguration) tcpCfgHandler.getConfiguration(BINARY);
ByteOrder byteOrder = LITTLE_ENDIAN_BYTE_ORDER.equalsIgnoreCase(binaryHandlerConfig.getByteOrder()) ByteOrder byteOrder = LITTLE_ENDIAN_BYTE_ORDER.equalsIgnoreCase(binaryHandlerConfig.getByteOrder())
? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN;
socketChannel.pipeline() socketChannel.pipeline()
.addLast("tracerHandler", new TracerHandler()) .addLast("tracerHandler", new TracerHandler())
.addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), parameter.handlerCfg().getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), tcpCfgHandler.getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge()))
.addLast("idleStateHandler", new IdleStateHandler(parameter.handlerCfg().getIdleTimeoutSeconds(), 0, 0)) .addLast("idleStateHandler", new IdleStateHandler(tcpCfgHandler.getIdleTimeoutSeconds(), 0, 0))
.addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName())); .addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName()));
if (LengthFieldBasedFrameDecoder.class.isAssignableFrom(binaryHandlerConfig.getDecoder())) { if (LengthFieldBasedFrameDecoder.class.isAssignableFrom(binaryHandlerConfig.getDecoder())) {
@@ -134,9 +137,7 @@ public abstract class ChannelHandlerInitializer<C extends Channel> extends Chann
.addLast("tcpByteHandler", new TcpChannelHandler<>(parameter)); .addLast("tcpByteHandler", new TcpChannelHandler<>(parameter));
} }
}; };
case null -> throw new IllegalArgumentException("Unknown: " + parameter.handlerCfg()); case null -> throw new IllegalArgumentException("Unknown: " + tcpCfgHandler);
}; };
} }
} }

View File

@@ -8,12 +8,10 @@ import io.micrometer.core.instrument.Timer;
import sanbing.jcpp.infrastructure.stats.DefaultCounter; import sanbing.jcpp.infrastructure.stats.DefaultCounter;
import sanbing.jcpp.infrastructure.stats.MessagesStats; import sanbing.jcpp.infrastructure.stats.MessagesStats;
import sanbing.jcpp.protocol.ProtocolMessageProcessor; import sanbing.jcpp.protocol.ProtocolMessageProcessor;
import sanbing.jcpp.protocol.cfg.TcpHandlerCfg;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public record ChannelHandlerParameter(String protocolName, public record ChannelHandlerParameter(String protocolName,
TcpHandlerCfg handlerCfg,
ProtocolMessageProcessor protocolMessageProcessor, ProtocolMessageProcessor protocolMessageProcessor,
AtomicInteger connectionsGauge, AtomicInteger connectionsGauge,
MessagesStats uplinkMsgStats, MessagesStats uplinkMsgStats,

View File

@@ -32,6 +32,8 @@ public abstract class Listener {
protected DefaultCounter downlinkTrafficCounter; protected DefaultCounter downlinkTrafficCounter;
protected Timer downlinkTimer; protected Timer downlinkTimer;
protected final ChannelHandlerParameter parameter;
protected Listener(String protocolName, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) { protected Listener(String protocolName, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) {
this.protocolName = protocolName; this.protocolName = protocolName;
this.protocolMessageProcessor = protocolMessageProcessor; this.protocolMessageProcessor = protocolMessageProcessor;
@@ -42,6 +44,8 @@ public abstract class Listener {
this.uplinkTrafficCounter = statsFactory.createDefaultCounter("listenerUplinkTraffic", "protocol", protocolName); this.uplinkTrafficCounter = statsFactory.createDefaultCounter("listenerUplinkTraffic", "protocol", protocolName);
this.downlinkTrafficCounter = statsFactory.createDefaultCounter("listenerDownlinkTraffic", "protocol", protocolName); this.downlinkTrafficCounter = statsFactory.createDefaultCounter("listenerDownlinkTraffic", "protocol", protocolName);
this.downlinkTimer = statsFactory.createTimer("listenerDownlink", "protocol", protocolName); this.downlinkTimer = statsFactory.createTimer("listenerDownlink", "protocol", protocolName);
this.parameter = new ChannelHandlerParameter(protocolName, protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, uplinkTrafficCounter, downlinkTrafficCounter, downlinkTimer);
} }
public abstract Health health(); public abstract Health health();

View File

@@ -22,7 +22,6 @@ import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory;
import sanbing.jcpp.protocol.ProtocolMessageProcessor; import sanbing.jcpp.protocol.ProtocolMessageProcessor;
import sanbing.jcpp.protocol.cfg.TcpCfg; import sanbing.jcpp.protocol.cfg.TcpCfg;
import sanbing.jcpp.protocol.listener.ChannelHandlerInitializer; import sanbing.jcpp.protocol.listener.ChannelHandlerInitializer;
import sanbing.jcpp.protocol.listener.ChannelHandlerParameter;
import sanbing.jcpp.protocol.listener.Listener; import sanbing.jcpp.protocol.listener.Listener;
/** /**
@@ -35,21 +34,17 @@ public class TcpListener extends Listener {
private EventLoopGroup bossGroup; private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
private final ChannelHandlerParameter parameter;
public TcpListener(String protocolName, TcpCfg tcpCfg, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) throws InterruptedException { public TcpListener(String protocolName, TcpCfg tcpCfg, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) throws InterruptedException {
super(protocolName, protocolMessageProcessor, statsFactory); super(protocolName, protocolMessageProcessor, statsFactory);
parameter = new ChannelHandlerParameter(protocolName, tcpCfg.getHandler(), protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, uplinkTrafficCounter, downlinkTrafficCounter, downlinkTimer); tcpServerBootstrap(tcpCfg);
tcpServerBootstrap(tcpCfg, getProtocolName());
} }
private void tcpServerBootstrap(TcpCfg tcpCfg, String protocolName) throws InterruptedException { private void tcpServerBootstrap(TcpCfg tcpCfg) throws InterruptedException {
bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss")); bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss"));
workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker")); workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker"));
ChannelHandlerInitializer<SocketChannel> channelHandler = ChannelHandlerInitializer.createTcpChannelHandler(parameter); ChannelHandlerInitializer<SocketChannel> channelHandler = ChannelHandlerInitializer.createTcpChannelHandler(tcpCfg, parameter);
ServerBootstrap server = new ServerBootstrap() ServerBootstrap server = new ServerBootstrap()
.group(bossGroup, workerGroup) .group(bossGroup, workerGroup)
@@ -63,7 +58,7 @@ public class TcpListener extends Listener {
.handler(new LoggingHandler(LogLevel.INFO)) .handler(new LoggingHandler(LogLevel.INFO))
.childHandler(channelHandler); .childHandler(channelHandler);
serverChannel = server.bind(tcpCfg.getBindAddress(), tcpCfg.getBindPort()).sync().channel(); serverChannel = server.bind(tcpCfg.getBindAddress(), tcpCfg.getBindPort()).sync().channel();
log.info("Tcp server [{}] started, BindAddress:[{}], BindPort: [{}]", protocolName, tcpCfg.getBindAddress(), tcpCfg.getBindPort()); log.info("Tcp server [{}] started, BindAddress:[{}], BindPort: [{}]", getProtocolName(), tcpCfg.getBindAddress(), tcpCfg.getBindPort());
} }