diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/Listener.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/Listener.java index b438b6f..d3a93da 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/Listener.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/Listener.java @@ -10,6 +10,7 @@ import org.springframework.boot.actuate.health.Health; import sanbing.jcpp.infrastructure.stats.DefaultCounter; import sanbing.jcpp.infrastructure.stats.MessagesStats; import sanbing.jcpp.infrastructure.stats.StatsFactory; +import sanbing.jcpp.protocol.ProtocolMessageProcessor; import java.util.concurrent.atomic.AtomicInteger; @@ -21,6 +22,9 @@ public abstract class Listener { @Getter private final String protocolName; + @Getter + private final ProtocolMessageProcessor protocolMessageProcessor; + protected AtomicInteger connectionsGauge = new AtomicInteger(); protected MessagesStats uplinkMsgStats; protected MessagesStats downlinkMsgStats; @@ -28,8 +32,9 @@ public abstract class Listener { protected DefaultCounter downlinkTrafficCounter; protected Timer downlinkTimer; - public Listener(String protocolName, StatsFactory statsFactory) { + protected Listener(String protocolName, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) { this.protocolName = protocolName; + this.protocolMessageProcessor = protocolMessageProcessor; statsFactory.createGauge("openConnections", connectionsGauge, "protocol", protocolName); this.uplinkMsgStats = statsFactory.createMessagesStats("listenerUplinkMessage", "protocol", protocolName); diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java index 121e295..4024de0 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java @@ -38,7 +38,7 @@ public class TcpListener extends Listener { private final ChannelHandlerParameter parameter; public TcpListener(String protocolName, TcpCfg tcpCfg, ProtocolMessageProcessor protocolMessageProcessor, StatsFactory statsFactory) throws InterruptedException { - super(protocolName, statsFactory); + super(protocolName, protocolMessageProcessor, statsFactory); parameter = new ChannelHandlerParameter(protocolName, tcpCfg.getHandler(), protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, uplinkTrafficCounter, downlinkTrafficCounter, downlinkTimer);