diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YouDianDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YouDianDecoder.java new file mode 100644 index 000000000..4bea0677d --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YouDianDecoder.java @@ -0,0 +1,138 @@ +package com.jsowell.netty.decoder; + +import com.jsowell.common.constant.Constants; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +@Slf4j +public class YouDianDecoder extends ByteToMessageDecoder { + private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 + private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 + + // 构造函数,初始化起始标志 + public YouDianDecoder() {} + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + // log.info("StartAndLengthFieldFrameDecoder.decode"); + // 记录包头开始的index + int beginReader; + + // 循环查找包头 + while (true) { + if (buffer.readableBytes() < Math.min(HEADER_LENGTH_DNY, HEADER_LENGTH_68)) { + return; // 数据长度不足,等待更多数据 + } + + // 获取包头开始的index + beginReader = buffer.readerIndex(); + buffer.markReaderIndex(); + + // 判断是否为DNY包头或68包头 + if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) { + break; // 读到了协议的开始标志,结束while循环 + } + + // 未读到包头,略过一个字节 + buffer.resetReaderIndex(); + buffer.readByte(); + } + + // 检查包头是否是 "DNY" + if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + String header = new String(headerBytes, StandardCharsets.UTF_8); + // log.info("检查包头是否是DNY, header:{}", header); + if (Constants.EBIKE_HEADER.equals(header)) { + // 处理 DNY 协议 + decodeDnyMessage(buffer, out, beginReader); + return; + } + } + + // 检查包头是否是 68 协议 + if (buffer.readableBytes() >= HEADER_LENGTH_68) { + if (buffer.getUnsignedByte(beginReader) == 0x68) { + // 处理 68 协议 + decode68Message(buffer, out, beginReader); + return; + } + } + + // 未知协议,还原读指针 + buffer.resetReaderIndex(); + } + + // 判断是否为DNY包头 + private boolean isStartOfDnyHeader(ByteBuf buffer, int beginReader) { + if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + String header = new String(headerBytes, StandardCharsets.UTF_8); + return Constants.EBIKE_HEADER.equals(header); + } + return false; + } + + // 判断是否为68包头 + private boolean isStartOf68Header(ByteBuf buffer, int beginReader) { + if (buffer.readableBytes() >= HEADER_LENGTH_68) { + return buffer.getUnsignedByte(beginReader) == 0x68; + } + return false; + } + + // 处理68协议消息 + private void decode68Message(ByteBuf buffer, List out, int beginReader) { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + 2) { + buffer.readerIndex(beginReader); + return; + } + + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length + 2) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 最后+2是帧校验域长度 + ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length + 2); + buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length + 2); + out.add(frame); + } + + // 处理DNY协议消息 + private void decodeDnyMessage(ByteBuf buffer, List out, int beginReader) { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { + buffer.readerIndex(beginReader); + return; + } + + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); + // log.info("获取消息长度, length:{}", length); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 + ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + length + 2); + buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + length + 2); + + + out.add(frame); + } + +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YunKuaiChongDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YunKuaiChongDecoder.java new file mode 100644 index 000000000..cf74d6e7b --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/YunKuaiChongDecoder.java @@ -0,0 +1,138 @@ +package com.jsowell.netty.decoder; + +import com.jsowell.common.constant.Constants; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +@Slf4j +public class YunKuaiChongDecoder extends ByteToMessageDecoder { + private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 + private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 + + // 构造函数,初始化起始标志 + public YunKuaiChongDecoder() {} + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + // log.info("StartAndLengthFieldFrameDecoder.decode"); + // 记录包头开始的index + int beginReader; + + // 循环查找包头 + while (true) { + if (buffer.readableBytes() < Math.min(HEADER_LENGTH_DNY, HEADER_LENGTH_68)) { + return; // 数据长度不足,等待更多数据 + } + + // 获取包头开始的index + beginReader = buffer.readerIndex(); + buffer.markReaderIndex(); + + // 判断是否为DNY包头或68包头 + if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) { + break; // 读到了协议的开始标志,结束while循环 + } + + // 未读到包头,略过一个字节 + buffer.resetReaderIndex(); + buffer.readByte(); + } + + // 检查包头是否是 "DNY" + if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + String header = new String(headerBytes, StandardCharsets.UTF_8); + // log.info("检查包头是否是DNY, header:{}", header); + if (Constants.EBIKE_HEADER.equals(header)) { + // 处理 DNY 协议 + decodeDnyMessage(buffer, out, beginReader); + return; + } + } + + // 检查包头是否是 68 协议 + if (buffer.readableBytes() >= HEADER_LENGTH_68) { + if (buffer.getUnsignedByte(beginReader) == 0x68) { + // 处理 68 协议 + decode68Message(buffer, out, beginReader); + return; + } + } + + // 未知协议,还原读指针 + buffer.resetReaderIndex(); + } + + // 判断是否为DNY包头 + private boolean isStartOfDnyHeader(ByteBuf buffer, int beginReader) { + if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + String header = new String(headerBytes, StandardCharsets.UTF_8); + return Constants.EBIKE_HEADER.equals(header); + } + return false; + } + + // 判断是否为68包头 + private boolean isStartOf68Header(ByteBuf buffer, int beginReader) { + if (buffer.readableBytes() >= HEADER_LENGTH_68) { + return buffer.getUnsignedByte(beginReader) == 0x68; + } + return false; + } + + // 处理68协议消息 + private void decode68Message(ByteBuf buffer, List out, int beginReader) { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + 2) { + buffer.readerIndex(beginReader); + return; + } + + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length + 2) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 最后+2是帧校验域长度 + ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length + 2); + buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length + 2); + out.add(frame); + } + + // 处理DNY协议消息 + private void decodeDnyMessage(ByteBuf buffer, List out, int beginReader) { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { + buffer.readerIndex(beginReader); + return; + } + + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); + // log.info("获取消息长度, length:{}", length); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 + ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + length + 2); + buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + length + 2); + + + out.add(frame); + } + +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java index 4e4cb6edc..c58d70bd5 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java @@ -1,6 +1,6 @@ package com.jsowell.netty.server.electricbicycles; -import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder; +import com.jsowell.netty.decoder.YouDianDecoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; @@ -20,7 +20,7 @@ public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast("frameDecoder", new StartAndLengthFieldFrameDecoder()); + pipeline.addLast("frameDecoder", new YouDianDecoder()); // pipeline.addLast("decoder", new MessageDecode()); // pipeline.addLast("encoder", new MessageEncode()); pipeline.addLast("decoder", new ByteArrayDecoder()); diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java index b343fde1b..c2237067a 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java @@ -1,6 +1,6 @@ package com.jsowell.netty.server.yunkuaichong; -import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder; +import com.jsowell.netty.decoder.YunKuaiChongDecoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; @@ -21,7 +21,7 @@ public class NettyServerChannelInitializer extends ChannelInitializer