package com.jsowell.netty.decoder; import com.jsowell.netty.domain.DnyMessage; import com.jsowell.netty.domain.Message68; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import java.util.List; @Slf4j public class StartAndLengthFieldFrameDecoder2 extends ByteToMessageDecoder { private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度,可以根据实际需求调整 private static final byte[] DNY_HEADER = {'D', 'N', 'Y'}; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { while (buffer.readableBytes() > 0) { int headerIndex = findHeaderIndex(buffer); if (headerIndex == -1) { // 没有找到有效的包头,丢弃所有数据 buffer.skipBytes(buffer.readableBytes()); return; } buffer.readerIndex(headerIndex); if (isDnyHeader(buffer)) { if (decodeDnyMessage(buffer, out)) { return; } } else if (is68Header(buffer)) { if (decode68Message(buffer, out)) { return; } } else { // 未知协议,跳过一个字节 buffer.skipBytes(1); } } } private int findHeaderIndex(ByteBuf buffer) { int dnyIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), DNY_HEADER[0]); int index68 = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), (byte) 0x68); if (dnyIndex == -1 && index68 == -1) { return -1; } else if (dnyIndex == -1) { return index68; } else if (index68 == -1) { return dnyIndex; } else { return Math.min(dnyIndex, index68); } } private boolean isDnyHeader(ByteBuf buffer) { return buffer.readableBytes() >= HEADER_LENGTH_DNY && buffer.getByte(buffer.readerIndex()) == 'D' && buffer.getByte(buffer.readerIndex() + 1) == 'N' && buffer.getByte(buffer.readerIndex() + 2) == 'Y'; } private boolean is68Header(ByteBuf buffer) { return buffer.readableBytes() >= HEADER_LENGTH_68 && buffer.getByte(buffer.readerIndex()) == 0x68; } private boolean decodeDnyMessage(ByteBuf buffer, List out) { if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2) { return false; // 数据不足,等待更多数据 } int lengthFieldIndex = buffer.readerIndex() + HEADER_LENGTH_DNY; int length = buffer.getUnsignedShort(lengthFieldIndex); if (length > MAX_FRAME_LENGTH) { log.warn("DNY frame length exceeds maximum allowed: {}", length); buffer.skipBytes(HEADER_LENGTH_DNY + 2); return false; } if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2 + length) { return false; // 数据不足,等待更多数据 } ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_DNY + 2 + length); out.add(new DnyMessage(frame)); return true; } private boolean decode68Message(ByteBuf buffer, List out) { if (buffer.readableBytes() < HEADER_LENGTH_68 + 1) { return false; // 数据不足,等待更多数据 } int length = buffer.getUnsignedByte(buffer.readerIndex() + HEADER_LENGTH_68); if (length > MAX_FRAME_LENGTH) { log.warn("68 frame length exceeds maximum allowed: {}", length); buffer.skipBytes(HEADER_LENGTH_68 + 1); return false; } if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length) { return false; // 数据不足,等待更多数据 } ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_68 + 1 + length); out.add(new Message68(frame)); return true; } }