session 本身有状态,无需传递无状态参数进来

This commit is contained in:
三丙
2024-10-11 11:51:32 +08:00
parent 278bb9e69e
commit 3ec2f97933
2 changed files with 17 additions and 15 deletions

View File

@@ -121,16 +121,18 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
}
}
protected void onDownlink(ChannelHandlerContext ctx, DownlinkRestMessage downlinkMsg) throws DownlinkException {
protected void onDownlink(DownlinkRestMessage downlinkMsg) throws DownlinkException {
protocolMessageProcessor.downlinkHandle(new SessionToHandlerMsg(downlinkMsg, tcpSession), downlinkMsgStats);
}
protected void writeAndFlush(ChannelHandlerContext ctx, ByteBuf... byteBufList) {
protected void writeAndFlush(ByteBuf... byteBufList) {
if (byteBufList == null || byteBufList.length == 0) {
return;
}
ChannelHandlerContext ctx = tcpSession.getCtx();
if (ctx.isRemoved()) {
tcpSession.close(SessionCloseReason.INACTIVE);
@@ -151,10 +153,10 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
continue;
}
logDownlinkStart(ctx, byteBuf.readableBytes(), () -> ByteBufUtil.hexDump(byteBuf));
logDownlinkStart(byteBuf.readableBytes(), () -> ByteBufUtil.hexDump(byteBuf));
ctx.writeAndFlush(Unpooled.wrappedBuffer(byteBuf))
.addListener(channelFuture -> logDownlinkUnsuccessful(ctx, channelFuture));
.addListener(this::logDownlinkUnsuccessful);
downlinkMsgStats.incrementSuccessful();
@@ -169,21 +171,21 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
}
private void logDownlinkStart(ChannelHandlerContext ctx, int payloadSize, Supplier<String> logTransform) {
private void logDownlinkStart(int payloadSize, Supplier<String> logTransform) {
downlinkTrafficCounter.add(payloadSize);
if (log.isDebugEnabled()) {
log.debug("[{}]{}{} 开始发送下行报文:{}", protocolName, ctx.channel(), tcpSession, logTransform.get());
log.debug("[{}]{} 开始发送下行报文:{}", protocolName, tcpSession, logTransform.get());
}
}
private void logDownlinkUnsuccessful(ChannelHandlerContext ctx, Future<? super Void> channelFuture) {
private void logDownlinkUnsuccessful(Future<? super Void> channelFuture) {
downlinkTimer.record(Duration.ofMillis(System.currentTimeMillis() - TracerContextUtil.getCurrentTracer().getTracerTs()));
if (channelFuture.isDone() && !channelFuture.isSuccess()) {
log.info("[{}]{}{} 下行报文发送未成功", protocolName, ctx.channel(), tcpSession);
log.info("[{}]{} 下行报文发送未成功", protocolName, tcpSession);
}
}

View File

@@ -16,7 +16,7 @@ import sanbing.jcpp.protocol.listener.tcp.enums.SequenceNumberLength;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* 设备会话
@@ -32,9 +32,9 @@ public class TcpSession extends ProtocolSession {
private ChannelHandlerContext ctx;
private final BiConsumer<ChannelHandlerContext, DownlinkRestMessage> sendDownlinkConsumer;
private final Consumer<DownlinkRestMessage> sendDownlinkConsumer;
private final BiConsumer<ChannelHandlerContext, ByteBuf> writeAndFlushConsumer;
private final Consumer<ByteBuf> writeAndFlushConsumer;
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
@@ -64,8 +64,8 @@ public class TcpSession extends ProtocolSession {
}
public TcpSession(String protocolName,
BiConsumer<ChannelHandlerContext, DownlinkRestMessage> sendDownlinkConsumer,
BiConsumer<ChannelHandlerContext, ByteBuf> writeAndFlushConsumer) {
Consumer<DownlinkRestMessage> sendDownlinkConsumer,
Consumer<ByteBuf> writeAndFlushConsumer) {
super(protocolName);
this.sendDownlinkConsumer = sendDownlinkConsumer;
this.writeAndFlushConsumer = writeAndFlushConsumer;
@@ -73,7 +73,7 @@ public class TcpSession extends ProtocolSession {
@Override
public void onDownlink(DownlinkRestMessage downlinkMsg) {
sendDownlinkConsumer.accept(ctx, downlinkMsg);
sendDownlinkConsumer.accept(downlinkMsg);
}
@Override
@@ -85,6 +85,6 @@ public class TcpSession extends ProtocolSession {
}
public void writeAndFlush(ByteBuf byteBuf) {
writeAndFlushConsumer.accept(ctx, byteBuf);
writeAndFlushConsumer.accept(byteBuf);
}
}