package com.jsowell.netty.decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; import java.util.List; @Slf4j public class StartAndLengthFieldFrameDecoder extends ByteToMessageDecoder { private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 // 起始标志 // private int HEAD_DATA; // public StartAndLengthFieldFrameDecoder(int HEAD_DATA) { // this.HEAD_DATA = HEAD_DATA; // } /** *
	 * 协议开始的标准head_data,int类型,占据1个字节.
	 * 表示数据的长度contentLength,int类型,占据1个字节.
	 * 
*/ // public final int BASE_LENGTH = 1 + 1; // @Override // protected void decode2(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { // // 可读长度必须大于基本长度 // if (buffer.readableBytes() <= BASE_LENGTH) { // log.warn("可读字节数:{}小于基础长度:{}", buffer.readableBytes(), BASE_LENGTH); // return; // } // // // 记录包头开始的index // int beginReader; // // while (true) { // // 获取包头开始的index // beginReader = buffer.readerIndex(); // // log.info("包头开始的index:{}", beginReader); // // 标记包头开始的index // buffer.markReaderIndex(); // // 读到了协议的开始标志,结束while循环 // if (buffer.getUnsignedByte(beginReader) == HEAD_DATA) { // // log.info("读到了协议的开始标志,结束while循环 byte:{}, HEAD_DATA:{}", buffer.getUnsignedByte(beginReader), HEAD_DATA); // break; // } // // // 未读到包头,略过一个字节 // // 每次略过,一个字节,去读取,包头信息的开始标记 // buffer.resetReaderIndex(); // buffer.readByte(); // // // 当略过,一个字节之后, // // 数据包的长度,又变得不满足 // // 此时,应该结束。等待后面的数据到达 // if (buffer.readableBytes() < BASE_LENGTH) { // log.debug("数据包的长度不满足 readableBytes:{}, BASE_LENGTH:{}", buffer.readableBytes(), BASE_LENGTH); // return; // } // } // // // 消息的长度 // int length = buffer.getUnsignedByte(beginReader + 1); // // 判断请求数据包数据是否到齐 // if (buffer.readableBytes() < length + 4) { // // log.info("请求数据包数据没有到齐,还原读指针 readableBytes:{}, 消息的长度:{}", buffer.readableBytes(), length); // // 还原读指针 // buffer.readerIndex(beginReader); // return; // } // // // 读取data数据 // byte[] data = new byte[length + 4]; // buffer.readBytes(data); // ByteBuf frame = buffer.retainedSlice(beginReader, length + 4); // buffer.readerIndex(beginReader + length + 4); // out.add(frame); // } protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { // 记录包头开始的index int beginReader; while (true) { if (buffer.readableBytes() < Math.min(HEADER_LENGTH_DNY, HEADER_LENGTH_68)) { return; // 数据长度不足,等待更多数据 } // 获取包头开始的index beginReader = buffer.readerIndex(); buffer.markReaderIndex(); // 读到了协议的开始标志,结束while循环 // if (buffer.getUnsignedByte(beginReader) == HEAD_DATA) { // break; // } // 读到了协议的开始标志,结束while循环 if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) { break; } // 未读到包头,略过一个字节 buffer.resetReaderIndex(); buffer.readByte(); } // 检查包头是否是 "DNY" if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); String header = new String(headerBytes, StandardCharsets.UTF_8); if ("DNY".equals(header)) { // 处理 DNY 协议 decodeDnyMessage(buffer, out, beginReader); return; } } // 检查包头是否是 68 协议 if (buffer.readableBytes() >= HEADER_LENGTH_68) { if (buffer.getUnsignedByte(beginReader) == 0x68) { // 处理 68 协议 decode68Message(buffer, out, beginReader); return; } } // 未知协议,还原读指针 buffer.resetReaderIndex(); } private boolean isStartOfDnyHeader(ByteBuf buffer, int beginReader) { if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); String header = new String(headerBytes, StandardCharsets.UTF_8); return "DNY".equals(header); } return false; } private boolean isStartOf68Header(ByteBuf buffer, int beginReader) { if (buffer.readableBytes() >= HEADER_LENGTH_68) { return buffer.getUnsignedByte(beginReader) == 0x68; } return false; } private void decode68Message(ByteBuf buffer, List out, int beginReader) { if (buffer.readableBytes() < HEADER_LENGTH_68 + 1) { buffer.readerIndex(beginReader); return; } // 消息的长度 int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length) { buffer.readerIndex(beginReader); return; } // 读取 data 数据 ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length); buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length); out.add(frame); } private void decodeDnyMessage(ByteBuf buffer, List out, int beginReader) { if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { buffer.readerIndex(beginReader); return; } // 消息的长度 int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { buffer.readerIndex(beginReader); return; } // 读取 data 数据 ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + 1 + length); buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + 1 + length); out.add(frame); } }