From c9bfa8ace8e5df003e552a86668a14ca4b6411bd Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Sat, 28 Dec 2024 13:44:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=8B=E5=8A=A8=E9=87=8A=E6=94=BE=E5=AF=B9?= =?UTF-8?q?=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../netty/decoder/YkcProtocolDecoder.java | 1 + .../netty/decoder/YouDianProtocolDecoder.java | 112 +++++++------- .../netty/decoder/YunKuaiChongDecoder.java | 140 +++++++++--------- .../yunkuaichong/NettyServerHandler.java | 72 +++++---- 4 files changed, 173 insertions(+), 152 deletions(-) diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YkcProtocolDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YkcProtocolDecoder.java index b34136d2f..15dc980f0 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YkcProtocolDecoder.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YkcProtocolDecoder.java @@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.List; +@Deprecated @Slf4j public class YkcProtocolDecoder extends ByteToMessageDecoder { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YouDianProtocolDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YouDianProtocolDecoder.java index c38ebef1f..c0e1f7422 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YouDianProtocolDecoder.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YouDianProtocolDecoder.java @@ -4,6 +4,7 @@ import com.jsowell.common.constant.Constants; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; @@ -37,7 +38,7 @@ public class YouDianProtocolDecoder extends ByteToMessageDecoder { buffer.markReaderIndex(); // 判断是否为DNY包头或68包头 - if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) { + if (isStartOfDnyHeader(buffer, beginReader)) { break; // 读到了协议的开始标志,结束while循环 } @@ -60,13 +61,13 @@ public class YouDianProtocolDecoder extends ByteToMessageDecoder { } // 检查包头是否是 68 协议 - if (buffer.readableBytes() >= HEADER_LENGTH_68) { - if (buffer.getUnsignedByte(beginReader) == 0x68) { - // 处理 68 协议 - decode68Message(buffer, out, beginReader); - return; - } - } + // if (buffer.readableBytes() >= HEADER_LENGTH_68) { + // if (buffer.getUnsignedByte(beginReader) == 0x68) { + // // 处理 68 协议 + // decode68Message(buffer, out, beginReader); + // return; + // } + // } // 未知协议,还原读指针 buffer.resetReaderIndex(); @@ -84,58 +85,61 @@ public class YouDianProtocolDecoder extends ByteToMessageDecoder { } // 判断是否为68包头 - private boolean isStartOf68Header(ByteBuf buffer, int beginReader) { - if (buffer.readableBytes() >= HEADER_LENGTH_68) { - return buffer.getUnsignedByte(beginReader) == 0x68; - } - return false; - } + // private boolean isStartOf68Header(ByteBuf buffer, int beginReader) { + // if (buffer.readableBytes() >= HEADER_LENGTH_68) { + // return buffer.getUnsignedByte(beginReader) == 0x68; + // } + // return false; + // } // 处理68协议消息 - private void decode68Message(ByteBuf buffer, List out, int beginReader) { - // 检查剩余数据是否足够 - if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + 2) { - buffer.readerIndex(beginReader); - return; - } - - // 获取消息长度 - int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); - // 检查剩余数据是否足够 - if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length + 2) { - buffer.readerIndex(beginReader); - return; - } - - // 读取 data 数据 最后+2是帧校验域长度 - ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length + 2); - buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length + 2); - out.add(frame); - } + // private void decode68Message(ByteBuf buffer, List out, int beginReader) { + // // 检查剩余数据是否足够 + // if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + 2) { + // buffer.readerIndex(beginReader); + // return; + // } + // + // // 获取消息长度 + // int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); + // // 检查剩余数据是否足够 + // if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length + 2) { + // buffer.readerIndex(beginReader); + // return; + // } + // + // // 读取 data 数据 最后+2是帧校验域长度 + // ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length + 2); + // buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length + 2); + // out.add(frame); + // } // 处理DNY协议消息 private void decodeDnyMessage(ByteBuf buffer, List out, int beginReader) { - // 检查剩余数据是否足够 - if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { - buffer.readerIndex(beginReader); - return; + ByteBuf frame = null; + try { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { + buffer.readerIndex(beginReader); + return; + } + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); + // log.info("获取消息长度, length:{}", length); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { + buffer.readerIndex(beginReader); + return; + } + // 读取 data 数据 + frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + length + 2); + buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + length + 2); + out.add(frame); + } finally { + if (frame != null) { + ReferenceCountUtil.release(frame); + } } - - // 获取消息长度 - int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); - // log.info("获取消息长度, length:{}", length); - // 检查剩余数据是否足够 - if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { - buffer.readerIndex(beginReader); - return; - } - - // 读取 data 数据 - ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + length + 2); - buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + length + 2); - - - out.add(frame); } } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YunKuaiChongDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YunKuaiChongDecoder.java index 8841b1dd0..4d85dbc60 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YunKuaiChongDecoder.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YunKuaiChongDecoder.java @@ -1,13 +1,12 @@ package com.jsowell.netty.decoder; -import com.jsowell.common.constant.Constants; import com.jsowell.common.core.domain.ykc.YKCDataProtocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; -import java.nio.charset.StandardCharsets; import java.util.List; @Slf4j @@ -35,7 +34,7 @@ public class YunKuaiChongDecoder extends ByteToMessageDecoder { buffer.markReaderIndex(); // 判断是否为DNY包头或68包头 - if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) { + if (isStartOf68Header(buffer, beginReader)) { break; // 读到了协议的开始标志,结束while循环 } @@ -45,17 +44,17 @@ public class YunKuaiChongDecoder extends ByteToMessageDecoder { } // 检查包头是否是 "DNY" - if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { - byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; - buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); - String header = new String(headerBytes, StandardCharsets.UTF_8); - // log.info("检查包头是否是DNY, header:{}", header); - if (Constants.EBIKE_HEADER.equals(header)) { - // 处理 DNY 协议 - decodeDnyMessage(buffer, out, beginReader); - return; - } - } + // if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + // byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + // buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + // String header = new String(headerBytes, StandardCharsets.UTF_8); + // // log.info("检查包头是否是DNY, header:{}", header); + // if (Constants.EBIKE_HEADER.equals(header)) { + // // 处理 DNY 协议 + // decodeDnyMessage(buffer, out, beginReader); + // return; + // } + // } // 检查包头是否是 68 协议 if (buffer.readableBytes() >= HEADER_LENGTH_68) { @@ -71,15 +70,15 @@ public class YunKuaiChongDecoder extends ByteToMessageDecoder { } // 判断是否为DNY包头 - private boolean isStartOfDnyHeader(ByteBuf buffer, int beginReader) { - if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { - byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; - buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); - String header = new String(headerBytes, StandardCharsets.UTF_8); - return Constants.EBIKE_HEADER.equals(header); - } - return false; - } + // private boolean isStartOfDnyHeader(ByteBuf buffer, int beginReader) { + // if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + // byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + // buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + // String header = new String(headerBytes, StandardCharsets.UTF_8); + // return Constants.EBIKE_HEADER.equals(header); + // } + // return false; + // } // 判断是否为68包头 private boolean isStartOf68Header(ByteBuf buffer, int beginReader) { @@ -91,54 +90,59 @@ public class YunKuaiChongDecoder extends ByteToMessageDecoder { // 处理68协议消息 private void decode68Message(ByteBuf buffer, List out, int beginReader) { - // 检查剩余数据是否足够 - if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + 2) { - buffer.readerIndex(beginReader); - return; + ByteBuf frame = null; + try { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + 2) { + buffer.readerIndex(beginReader); + return; + } + + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length + 2) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 最后+2是帧校验域长度 + frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length + 2); + buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length + 2); + + // 转为YKCDataProtocol对象 + byte[] bytes = new byte[HEADER_LENGTH_68 + 1 + length + 2]; + frame.readBytes(bytes); + YKCDataProtocol ykcDataProtocol = new YKCDataProtocol(bytes); + out.add(ykcDataProtocol); + } finally { + if (frame != null) { + ReferenceCountUtil.release(frame); + } } - - // 获取消息长度 - int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); - // 检查剩余数据是否足够 - if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length + 2) { - buffer.readerIndex(beginReader); - return; - } - - // 读取 data 数据 最后+2是帧校验域长度 - ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length + 2); - buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length + 2); - - // 转为YKCDataProtocol对象 - byte[] bytes = new byte[HEADER_LENGTH_68 + 1 + length + 2]; - frame.readBytes(bytes); - YKCDataProtocol ykcDataProtocol = new YKCDataProtocol(bytes); - out.add(ykcDataProtocol); } // 处理DNY协议消息 - private void decodeDnyMessage(ByteBuf buffer, List out, int beginReader) { - // 检查剩余数据是否足够 - if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { - buffer.readerIndex(beginReader); - return; - } - - // 获取消息长度 - int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); - // log.info("获取消息长度, length:{}", length); - // 检查剩余数据是否足够 - if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { - buffer.readerIndex(beginReader); - return; - } - - // 读取 data 数据 - ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + length + 2); - buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + length + 2); - - - out.add(frame); - } + // private void decodeDnyMessage(ByteBuf buffer, List out, int beginReader) { + // // 检查剩余数据是否足够 + // if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { + // buffer.readerIndex(beginReader); + // return; + // } + // + // // 获取消息长度 + // int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); + // // log.info("获取消息长度, length:{}", length); + // // 检查剩余数据是否足够 + // if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { + // buffer.readerIndex(beginReader); + // return; + // } + // + // // 读取 data 数据 + // ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + length + 2); + // buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + length + 2); + // out.add(frame); + // } } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java index 18689933a..a049504ee 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java @@ -231,25 +231,31 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - String socketString = ctx.channel().remoteAddress().toString(); - ChannelId channelId = ctx.channel().id(); - String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText()); - if (evt instanceof IdleStateEvent) { // 超时事件 - IdleStateEvent event = (IdleStateEvent) evt; - boolean flag = false; - if (event.state() == IdleState.READER_IDLE) { // 读 - flag = true; - // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn); - } else if (event.state() == IdleState.WRITER_IDLE) { // 写 - flag = true; - // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn); - } else if (event.state() == IdleState.ALL_IDLE) { // 全部 - flag = true; - // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn); + try { + String socketString = ctx.channel().remoteAddress().toString(); + ChannelId channelId = ctx.channel().id(); + String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText()); + if (evt instanceof IdleStateEvent) { // 超时事件 + IdleStateEvent event = (IdleStateEvent) evt; + boolean flag = false; + if (event.state() == IdleState.READER_IDLE) { // 读 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn); + } else if (event.state() == IdleState.WRITER_IDLE) { // 写 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn); + } else if (event.state() == IdleState.ALL_IDLE) { // 全部 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn); + } + if (flag) { + ctx.channel().close(); + // close(channelId, pileSn); + } } - if (flag) { - ctx.channel().close(); - // close(channelId, pileSn); + } finally { + if (evt instanceof ByteBuf) { + ReferenceCountUtil.release(evt); } } } @@ -259,20 +265,26 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ChannelId channelId = ctx.channel().id(); - String channelIdShortText = channelId.asShortText(); - String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText); - log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause); - cause.printStackTrace(); - // 如果桩连到平台,在1分钟内没有发送数据过来,会报ReadTimeoutException异常 - if (cause instanceof ReadTimeoutException) { - if (log.isTraceEnabled()) { - log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress()); + try { + + ChannelId channelId = ctx.channel().id(); + String channelIdShortText = channelId.asShortText(); + String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText); + log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause); + cause.printStackTrace(); + // 如果桩连到平台,在1分钟内没有发送数据过来,会报ReadTimeoutException异常 + if (cause instanceof ReadTimeoutException) { + if (log.isTraceEnabled()) { + log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress()); + } + log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size()); + ctx.channel().close(); + } + } finally { + if (ctx.channel().isActive()) { + ctx.close(); } - log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size()); - ctx.channel().close(); } - // close(channelId, pileSn); }