去除流量指标

This commit is contained in:
三丙
2025-01-18 16:14:20 +08:00
parent da52d2b57e
commit 71c277993e
3 changed files with 3 additions and 20 deletions

View File

@@ -5,7 +5,6 @@
package sanbing.jcpp.protocol.listener; package sanbing.jcpp.protocol.listener;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import sanbing.jcpp.infrastructure.stats.DefaultCounter;
import sanbing.jcpp.infrastructure.stats.MessagesStats; import sanbing.jcpp.infrastructure.stats.MessagesStats;
import sanbing.jcpp.protocol.ProtocolMessageProcessor; import sanbing.jcpp.protocol.ProtocolMessageProcessor;
@@ -16,7 +15,5 @@ public record ChannelHandlerParameter(String protocolName,
AtomicInteger connectionsGauge, AtomicInteger connectionsGauge,
MessagesStats uplinkMsgStats, MessagesStats uplinkMsgStats,
MessagesStats downlinkMsgStats, MessagesStats downlinkMsgStats,
DefaultCounter uplinkTrafficCounter,
DefaultCounter downlinkTrafficCounter,
Timer downlinkTimer) { Timer downlinkTimer) {
} }

View File

@@ -7,7 +7,6 @@ package sanbing.jcpp.protocol.listener;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import lombok.Getter; import lombok.Getter;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import sanbing.jcpp.infrastructure.stats.DefaultCounter;
import sanbing.jcpp.infrastructure.stats.MessagesStats; import sanbing.jcpp.infrastructure.stats.MessagesStats;
import sanbing.jcpp.infrastructure.stats.StatsFactory; import sanbing.jcpp.infrastructure.stats.StatsFactory;
import sanbing.jcpp.protocol.ProtocolMessageProcessor; import sanbing.jcpp.protocol.ProtocolMessageProcessor;
@@ -28,8 +27,6 @@ public abstract class Listener {
protected AtomicInteger connectionsGauge = new AtomicInteger(); protected AtomicInteger connectionsGauge = new AtomicInteger();
protected MessagesStats uplinkMsgStats; protected MessagesStats uplinkMsgStats;
protected MessagesStats downlinkMsgStats; protected MessagesStats downlinkMsgStats;
protected DefaultCounter uplinkTrafficCounter;
protected DefaultCounter downlinkTrafficCounter;
protected Timer downlinkTimer; protected Timer downlinkTimer;
protected final ChannelHandlerParameter parameter; protected final ChannelHandlerParameter parameter;
@@ -41,11 +38,9 @@ public abstract class Listener {
statsFactory.createGauge("openConnections", connectionsGauge, "protocol", protocolName); statsFactory.createGauge("openConnections", connectionsGauge, "protocol", protocolName);
this.uplinkMsgStats = statsFactory.createMessagesStats("listenerUplinkMessage", "protocol", protocolName); this.uplinkMsgStats = statsFactory.createMessagesStats("listenerUplinkMessage", "protocol", protocolName);
this.downlinkMsgStats = statsFactory.createMessagesStats("listenerDownlinkMessage", "protocol", protocolName); this.downlinkMsgStats = statsFactory.createMessagesStats("listenerDownlinkMessage", "protocol", protocolName);
this.uplinkTrafficCounter = statsFactory.createDefaultCounter("listenerUplinkTraffic", "protocol", protocolName);
this.downlinkTrafficCounter = statsFactory.createDefaultCounter("listenerDownlinkTraffic", "protocol", protocolName);
this.downlinkTimer = statsFactory.createTimer("listenerDownlink", "protocol", protocolName); this.downlinkTimer = statsFactory.createTimer("listenerDownlink", "protocol", protocolName);
this.parameter = new ChannelHandlerParameter(protocolName, protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, uplinkTrafficCounter, downlinkTrafficCounter, downlinkTimer); this.parameter = new ChannelHandlerParameter(protocolName, protocolMessageProcessor, connectionsGauge, uplinkMsgStats, downlinkMsgStats, downlinkTimer);
} }
public abstract Health health(); public abstract Health health();

View File

@@ -14,7 +14,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.infrastructure.stats.DefaultCounter;
import sanbing.jcpp.infrastructure.stats.MessagesStats; import sanbing.jcpp.infrastructure.stats.MessagesStats;
import sanbing.jcpp.infrastructure.util.exception.DownlinkException; import sanbing.jcpp.infrastructure.util.exception.DownlinkException;
import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil; import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
@@ -38,9 +37,7 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
private final ProtocolMessageProcessor protocolMessageProcessor; private final ProtocolMessageProcessor protocolMessageProcessor;
private final MessagesStats uplinkMsgStats; private final MessagesStats uplinkMsgStats;
private final DefaultCounter uplinkTrafficCounter;
private final MessagesStats downlinkMsgStats; private final MessagesStats downlinkMsgStats;
private final DefaultCounter downlinkTrafficCounter;
private final Timer downlinkTimer; private final Timer downlinkTimer;
private final TcpSession tcpSession; private final TcpSession tcpSession;
@@ -51,9 +48,7 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
this.protocolMessageProcessor = parameter.protocolMessageProcessor(); this.protocolMessageProcessor = parameter.protocolMessageProcessor();
this.uplinkMsgStats = parameter.uplinkMsgStats(); this.uplinkMsgStats = parameter.uplinkMsgStats();
this.uplinkTrafficCounter = parameter.uplinkTrafficCounter();
this.downlinkMsgStats = parameter.downlinkMsgStats(); this.downlinkMsgStats = parameter.downlinkMsgStats();
this.downlinkTrafficCounter = parameter.downlinkTrafficCounter();
this.downlinkTimer = parameter.downlinkTimer(); this.downlinkTimer = parameter.downlinkTimer();
tcpSession = new TcpSession(protocolName, this::onDownlink, this::writeAndFlush); tcpSession = new TcpSession(protocolName, this::onDownlink, this::writeAndFlush);
@@ -69,8 +64,6 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
uplinkMsgStats.incrementTotal(); uplinkMsgStats.incrementTotal();
uplinkTrafficCounter.add(msg.size());
tcpSession.setLastActivityTime(LocalDateTime.now()); tcpSession.setLastActivityTime(LocalDateTime.now());
if (tcpSession.getAddress() == null) { if (tcpSession.getAddress() == null) {
@@ -153,7 +146,7 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
continue; continue;
} }
logDownlinkStart(byteBuf.readableBytes(), () -> ByteBufUtil.hexDump(byteBuf)); logDownlinkStart(() -> ByteBufUtil.hexDump(byteBuf));
ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf)) ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf))
.addListener(this::logDownlinkUnsuccessful); .addListener(this::logDownlinkUnsuccessful);
@@ -171,9 +164,7 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
} }
private void logDownlinkStart(int payloadSize, Supplier<String> logTransform) { private void logDownlinkStart(Supplier<String> logTransform) {
downlinkTrafficCounter.add(payloadSize);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}]{} 开始发送下行报文:{}", protocolName, tcpSession, logTransform.get()); log.debug("[{}]{} 开始发送下行报文:{}", protocolName, tcpSession, logTransform.get());