From 278bb9e69eb95c6f62559de89f1baf98e21308fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= <10604541+sanbing-os@user.noreply.gitee.com> Date: Thu, 10 Oct 2024 11:44:27 +0800 Subject: [PATCH] =?UTF-8?q?ChannelHandlerParameter=20=E4=B8=8A=E6=B5=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/ChannelHandlerInitializer.java | 27 ++++++++++--------- .../listener/ChannelHandlerParameter.java | 2 -- .../jcpp/protocol/listener/Listener.java | 4 +++ .../protocol/listener/tcp/TcpListener.java | 13 +++------ 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/ChannelHandlerInitializer.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/ChannelHandlerInitializer.java index e2c1f62..0b9c892 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/ChannelHandlerInitializer.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/ChannelHandlerInitializer.java @@ -19,6 +19,8 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import sanbing.jcpp.protocol.cfg.TcpCfg; +import sanbing.jcpp.protocol.cfg.TcpHandlerCfg; import sanbing.jcpp.protocol.cfg.enums.TcpHandlerType; import sanbing.jcpp.protocol.listener.tcp.TcpChannelHandler; import sanbing.jcpp.protocol.listener.tcp.configs.BinaryHandlerConfiguration; @@ -59,22 +61,23 @@ public abstract class ChannelHandlerInitializer extends Chann super.channelUnregistered(ctx); } - public static ChannelHandlerInitializer createTcpChannelHandler(ChannelHandlerParameter parameter) { - TcpHandlerType type = parameter.handlerCfg().getType(); + public static ChannelHandlerInitializer createTcpChannelHandler(TcpCfg tcpCfg, ChannelHandlerParameter parameter) { + TcpHandlerCfg tcpCfgHandler = tcpCfg.getHandler(); + TcpHandlerType type = tcpCfgHandler.getType(); return switch (type) { case TEXT -> new ChannelHandlerInitializer<>() { @Override protected void initChannel(SocketChannel socketChannel) { - TextHandlerConfiguration textHandlerConfig = (TextHandlerConfiguration) parameter.handlerCfg().getConfiguration(TEXT); + TextHandlerConfiguration textHandlerConfig = (TextHandlerConfiguration) tcpCfgHandler.getConfiguration(TEXT); ByteBuf[] delimiters = SYSTEM_LINE_SEPARATOR.equals(textHandlerConfig.getMessageSeparator()) ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter(); DelimiterBasedFrameDecoder framer = new DelimiterBasedFrameDecoder(textHandlerConfig.getMaxFrameLength(), textHandlerConfig.isStripDelimiter(), delimiters); socketChannel.pipeline() .addLast("tracerHandler", new TracerHandler()) - .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), parameter.handlerCfg().getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) - .addLast("idleStateHandler", new IdleStateHandler(parameter.handlerCfg().getIdleTimeoutSeconds(), 0, 0)) + .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), tcpCfgHandler.getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) + .addLast("idleStateHandler", new IdleStateHandler(tcpCfgHandler.getIdleTimeoutSeconds(), 0, 0)) .addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName())) .addLast("framer", framer) .addLast("tcpTextDecoder", new TcpMsgDecoder<>(parameter.protocolName(), msg -> TcpMsgDecoder.toString(msg, textHandlerConfig.getCharsetName()))) @@ -87,9 +90,9 @@ public abstract class ChannelHandlerInitializer extends Chann protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() .addLast("tracerHandler", new TracerHandler()) - .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), parameter.handlerCfg().getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) + .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), tcpCfgHandler.getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) .addLast("idleStateHandler", - new IdleStateHandler(parameter.handlerCfg().getIdleTimeoutSeconds(), 0, 0)) + new IdleStateHandler(tcpCfgHandler.getIdleTimeoutSeconds(), 0, 0)) .addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName())) .addLast("datagramToJsonDecoder", new JsonObjectDecoder()) .addLast("tcpJsonDecoder", new TcpMsgDecoder<>(parameter.protocolName(), TcpMsgDecoder::toJson)) @@ -99,15 +102,15 @@ public abstract class ChannelHandlerInitializer extends Chann case BINARY -> new ChannelHandlerInitializer<>() { @Override protected void initChannel(SocketChannel socketChannel) { - BinaryHandlerConfiguration binaryHandlerConfig = (BinaryHandlerConfiguration) parameter.handlerCfg().getConfiguration(BINARY); + BinaryHandlerConfiguration binaryHandlerConfig = (BinaryHandlerConfiguration) tcpCfgHandler.getConfiguration(BINARY); ByteOrder byteOrder = LITTLE_ENDIAN_BYTE_ORDER.equalsIgnoreCase(binaryHandlerConfig.getByteOrder()) ? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN; socketChannel.pipeline() .addLast("tracerHandler", new TracerHandler()) - .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), parameter.handlerCfg().getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) - .addLast("idleStateHandler", new IdleStateHandler(parameter.handlerCfg().getIdleTimeoutSeconds(), 0, 0)) + .addLast("connectionLimitHandler", new ConnectionLimitHandler(parameter.protocolName(), tcpCfgHandler.getMaxConnections(), CHANNEL_GROUP, parameter.connectionsGauge())) + .addLast("idleStateHandler", new IdleStateHandler(tcpCfgHandler.getIdleTimeoutSeconds(), 0, 0)) .addLast("idleEventHandler", new IdleEventHandler(parameter.protocolName())); if (LengthFieldBasedFrameDecoder.class.isAssignableFrom(binaryHandlerConfig.getDecoder())) { @@ -134,9 +137,7 @@ public abstract class ChannelHandlerInitializer extends Chann .addLast("tcpByteHandler", new TcpChannelHandler<>(parameter)); } }; - case null -> throw new IllegalArgumentException("Unknown: " + parameter.handlerCfg()); + case null -> throw new IllegalArgumentException("Unknown: " + tcpCfgHandler); }; } - - } diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/ChannelHandlerParameter.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/ChannelHandlerParameter.java index dc5509d..b215156 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/ChannelHandlerParameter.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/ChannelHandlerParameter.java @@ -8,12 +8,10 @@ import io.micrometer.core.instrument.Timer; import sanbing.jcpp.infrastructure.stats.DefaultCounter; import sanbing.jcpp.infrastructure.stats.MessagesStats; import sanbing.jcpp.protocol.ProtocolMessageProcessor; -import sanbing.jcpp.protocol.cfg.TcpHandlerCfg; import java.util.concurrent.atomic.AtomicInteger; public record ChannelHandlerParameter(String protocolName, - TcpHandlerCfg handlerCfg, ProtocolMessageProcessor protocolMessageProcessor, AtomicInteger connectionsGauge, MessagesStats uplinkMsgStats, diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/Listener.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/Listener.java index d3a93da..918f5df 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/Listener.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/Listener.java @@ -32,6 +32,8 @@ public abstract class Listener { protected DefaultCounter downlinkTrafficCounter; protected Timer downlinkTimer; + protected final ChannelHandlerParameter parameter; + protected Listener(String protocolName, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) { this.protocolName = protocolName; this.protocolMessageProcessor = protocolMessageProcessor; @@ -42,6 +44,8 @@ public abstract class Listener { this.uplinkTrafficCounter = statsFactory.createDefaultCounter("listenerUplinkTraffic", "protocol", protocolName); this.downlinkTrafficCounter = statsFactory.createDefaultCounter("listenerDownlinkTraffic", "protocol", protocolName); this.downlinkTimer = statsFactory.createTimer("listenerDownlink", "protocol", protocolName); + + this.parameter = new ChannelHandlerParameter(protocolName, protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, uplinkTrafficCounter, downlinkTrafficCounter, downlinkTimer); } public abstract Health health(); diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java index 4024de0..91a6cb4 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java @@ -22,7 +22,6 @@ import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory; import sanbing.jcpp.protocol.ProtocolMessageProcessor; import sanbing.jcpp.protocol.cfg.TcpCfg; import sanbing.jcpp.protocol.listener.ChannelHandlerInitializer; -import sanbing.jcpp.protocol.listener.ChannelHandlerParameter; import sanbing.jcpp.protocol.listener.Listener; /** @@ -35,21 +34,17 @@ public class TcpListener extends Listener { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; - private final ChannelHandlerParameter parameter; - public TcpListener(String protocolName, TcpCfg tcpCfg, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) throws InterruptedException { super(protocolName, protocolMessageProcessor, statsFactory); - parameter = new ChannelHandlerParameter(protocolName, tcpCfg.getHandler(), protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, uplinkTrafficCounter, downlinkTrafficCounter, downlinkTimer); - - tcpServerBootstrap(tcpCfg, getProtocolName()); + tcpServerBootstrap(tcpCfg); } - private void tcpServerBootstrap(TcpCfg tcpCfg, String protocolName) throws InterruptedException { + private void tcpServerBootstrap(TcpCfg tcpCfg) throws InterruptedException { bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss")); workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker")); - ChannelHandlerInitializer channelHandler = ChannelHandlerInitializer.createTcpChannelHandler(parameter); + ChannelHandlerInitializer channelHandler = ChannelHandlerInitializer.createTcpChannelHandler(tcpCfg, parameter); ServerBootstrap server = new ServerBootstrap() .group(bossGroup, workerGroup) @@ -63,7 +58,7 @@ public class TcpListener extends Listener { .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(channelHandler); serverChannel = server.bind(tcpCfg.getBindAddress(), tcpCfg.getBindPort()).sync().channel(); - log.info("Tcp server [{}] started, BindAddress:[{}], BindPort: [{}]", protocolName, tcpCfg.getBindAddress(), tcpCfg.getBindPort()); + log.info("Tcp server [{}] started, BindAddress:[{}], BindPort: [{}]", getProtocolName(), tcpCfg.getBindAddress(), tcpCfg.getBindPort()); }