ProtocolMessageProcessor 应作为必须项

This commit is contained in:
三丙
2024-10-10 11:19:39 +08:00
parent 344e23d80e
commit 02c399d8dd
2 changed files with 7 additions and 2 deletions

View File

@@ -10,6 +10,7 @@ import org.springframework.boot.actuate.health.Health;
import sanbing.jcpp.infrastructure.stats.DefaultCounter; import sanbing.jcpp.infrastructure.stats.DefaultCounter;
import sanbing.jcpp.infrastructure.stats.MessagesStats; import sanbing.jcpp.infrastructure.stats.MessagesStats;
import sanbing.jcpp.infrastructure.stats.StatsFactory; import sanbing.jcpp.infrastructure.stats.StatsFactory;
import sanbing.jcpp.protocol.ProtocolMessageProcessor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -21,6 +22,9 @@ public abstract class Listener {
@Getter @Getter
private final String protocolName; private final String protocolName;
@Getter
private final ProtocolMessageProcessor protocolMessageProcessor;
protected AtomicInteger connectionsGauge = new AtomicInteger(); protected AtomicInteger connectionsGauge = new AtomicInteger();
protected MessagesStats uplinkMsgStats; protected MessagesStats uplinkMsgStats;
protected MessagesStats downlinkMsgStats; protected MessagesStats downlinkMsgStats;
@@ -28,8 +32,9 @@ public abstract class Listener {
protected DefaultCounter downlinkTrafficCounter; protected DefaultCounter downlinkTrafficCounter;
protected Timer downlinkTimer; protected Timer downlinkTimer;
public Listener(String protocolName, StatsFactory statsFactory) { protected Listener(String protocolName, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) {
this.protocolName = protocolName; this.protocolName = protocolName;
this.protocolMessageProcessor = protocolMessageProcessor;
statsFactory.createGauge("openConnections", connectionsGauge, "protocol", protocolName); statsFactory.createGauge("openConnections", connectionsGauge, "protocol", protocolName);
this.uplinkMsgStats = statsFactory.createMessagesStats("listenerUplinkMessage", "protocol", protocolName); this.uplinkMsgStats = statsFactory.createMessagesStats("listenerUplinkMessage", "protocol", protocolName);

View File

@@ -38,7 +38,7 @@ public class TcpListener extends Listener {
private final ChannelHandlerParameter parameter; private final ChannelHandlerParameter parameter;
public TcpListener(String protocolName, TcpCfg tcpCfg, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) throws InterruptedException { public TcpListener(String protocolName, TcpCfg tcpCfg, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) throws InterruptedException {
super(protocolName, statsFactory); super(protocolName, protocolMessageProcessor, statsFactory);
parameter = new ChannelHandlerParameter(protocolName, tcpCfg.getHandler(), protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, uplinkTrafficCounter, downlinkTrafficCounter, downlinkTimer); parameter = new ChannelHandlerParameter(protocolName, tcpCfg.getHandler(), protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, uplinkTrafficCounter, downlinkTrafficCounter, downlinkTimer);