From 3ec2f97933d1ad75993ed9c603728dc191183a4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= <10604541+sanbing-os@user.noreply.gitee.com> Date: Fri, 11 Oct 2024 11:51:32 +0800 Subject: [PATCH] =?UTF-8?q?session=20=E6=9C=AC=E8=BA=AB=E6=9C=89=E7=8A=B6?= =?UTF-8?q?=E6=80=81=EF=BC=8C=E6=97=A0=E9=9C=80=E4=BC=A0=E9=80=92=E6=97=A0?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=8F=82=E6=95=B0=E8=BF=9B=E6=9D=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/tcp/TcpChannelHandler.java | 18 ++++++++++-------- .../jcpp/protocol/listener/tcp/TcpSession.java | 14 +++++++------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpChannelHandler.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpChannelHandler.java index 03d5d24..c8819e1 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpChannelHandler.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpChannelHandler.java @@ -121,16 +121,18 @@ public class TcpChannelHandler extends SimpleChannelInboundHandler extends SimpleChannelInboundHandler ByteBufUtil.hexDump(byteBuf)); + logDownlinkStart(byteBuf.readableBytes(), () -> ByteBufUtil.hexDump(byteBuf)); ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf)) - .addListener(channelFuture -> logDownlinkUnsuccessful(ctx, channelFuture)); + .addListener(this::logDownlinkUnsuccessful); downlinkMsgStats.incrementSuccessful(); @@ -169,21 +171,21 @@ public class TcpChannelHandler extends SimpleChannelInboundHandler logTransform) { + private void logDownlinkStart(int payloadSize, Supplier logTransform) { downlinkTrafficCounter.add(payloadSize); if (log.isDebugEnabled()) { - log.debug("[{}]{}{} 开始发送下行报文:{}", protocolName, ctx.channel(), tcpSession, logTransform.get()); + log.debug("[{}]{} 开始发送下行报文:{}", protocolName, tcpSession, logTransform.get()); } } - private void logDownlinkUnsuccessful(ChannelHandlerContext ctx, Future channelFuture) { + private void logDownlinkUnsuccessful(Future channelFuture) { downlinkTimer.record(Duration.ofMillis(System.currentTimeMillis() - TracerContextUtil.getCurrentTracer().getTracerTs())); if (channelFuture.isDone() && !channelFuture.isSuccess()) { - log.info("[{}]{}{} 下行报文发送未成功", protocolName, ctx.channel(), tcpSession); + log.info("[{}]{} 下行报文发送未成功", protocolName, tcpSession); } } diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpSession.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpSession.java index 7071521..36b20ad 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpSession.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpSession.java @@ -16,7 +16,7 @@ import sanbing.jcpp.protocol.listener.tcp.enums.SequenceNumberLength; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * 设备会话 @@ -32,9 +32,9 @@ public class TcpSession extends ProtocolSession { private ChannelHandlerContext ctx; - private final BiConsumer sendDownlinkConsumer; + private final Consumer sendDownlinkConsumer; - private final BiConsumer writeAndFlushConsumer; + private final Consumer writeAndFlushConsumer; private final AtomicInteger sequenceNumber = new AtomicInteger(0); @@ -64,8 +64,8 @@ public class TcpSession extends ProtocolSession { } public TcpSession(String protocolName, - BiConsumer sendDownlinkConsumer, - BiConsumer writeAndFlushConsumer) { + Consumer sendDownlinkConsumer, + Consumer writeAndFlushConsumer) { super(protocolName); this.sendDownlinkConsumer = sendDownlinkConsumer; this.writeAndFlushConsumer = writeAndFlushConsumer; @@ -73,7 +73,7 @@ public class TcpSession extends ProtocolSession { @Override public void onDownlink(DownlinkRestMessage downlinkMsg) { - sendDownlinkConsumer.accept(ctx, downlinkMsg); + sendDownlinkConsumer.accept(downlinkMsg); } @Override @@ -85,6 +85,6 @@ public class TcpSession extends ProtocolSession { } public void writeAndFlush(ByteBuf byteBuf) { - writeAndFlushConsumer.accept(ctx, byteBuf); + writeAndFlushConsumer.accept(byteBuf); } }