diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java index 7f4146081..492c9a3e7 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java @@ -4,9 +4,11 @@ import com.jsowell.netty.domain.ChargingPileMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; import java.util.List; +@Slf4j public class ChargingPileDecoder extends ByteToMessageDecoder { private static final byte[] FRAME_HEADER = {'D', 'N', 'Y'}; // 包头为"DNY" @@ -52,7 +54,7 @@ public class ChargingPileDecoder extends ByteToMessageDecoder { // 验证校验和 short calculatedChecksum = calculateChecksum(in, length); if (checksum != calculatedChecksum) { - System.out.println("校验和错误,丢弃此帧"); + log.info("校验和错误,丢弃此帧"); continue; } @@ -63,10 +65,12 @@ public class ChargingPileDecoder extends ByteToMessageDecoder { } private boolean isValidHeader(byte[] header) { + log.info("isValidHeader header:{}", header); return header[0] == FRAME_HEADER[0] && header[1] == FRAME_HEADER[1] && header[2] == FRAME_HEADER[2]; } private short calculateChecksum(ByteBuf buf, int length) { + log.info("calculateChecksum ByteBuf:{}, length:{}", buf.toString(), length); short sum = 0; for (int i = 0; i < length - 2; i++) { sum += buf.getByte(buf.readerIndex() - length + i) & 0xFF; diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java index 5cf54c4c3..7a6fffb7d 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java @@ -16,77 +16,6 @@ public class StartAndLengthFieldFrameDecoder extends ByteToMessageDecoder { // 构造函数,初始化起始标志 public StartAndLengthFieldFrameDecoder() {} - // 起始标志 - // private int HEAD_DATA; - - - // public StartAndLengthFieldFrameDecoder(int HEAD_DATA) { - // this.HEAD_DATA = HEAD_DATA; - // } - - /** - *
-	 * 协议开始的标准head_data,int类型,占据1个字节.
-	 * 表示数据的长度contentLength,int类型,占据1个字节.
-	 * 
- */ - // public final int BASE_LENGTH = 1 + 1; - - // @Override - // protected void decode2(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { - // // 可读长度必须大于基本长度 - // if (buffer.readableBytes() <= BASE_LENGTH) { - // log.warn("可读字节数:{}小于基础长度:{}", buffer.readableBytes(), BASE_LENGTH); - // return; - // } - // - // // 记录包头开始的index - // int beginReader; - // - // while (true) { - // // 获取包头开始的index - // beginReader = buffer.readerIndex(); - // // log.info("包头开始的index:{}", beginReader); - // // 标记包头开始的index - // buffer.markReaderIndex(); - // // 读到了协议的开始标志,结束while循环 - // if (buffer.getUnsignedByte(beginReader) == HEAD_DATA) { - // // log.info("读到了协议的开始标志,结束while循环 byte:{}, HEAD_DATA:{}", buffer.getUnsignedByte(beginReader), HEAD_DATA); - // break; - // } - // - // // 未读到包头,略过一个字节 - // // 每次略过,一个字节,去读取,包头信息的开始标记 - // buffer.resetReaderIndex(); - // buffer.readByte(); - // - // // 当略过,一个字节之后, - // // 数据包的长度,又变得不满足 - // // 此时,应该结束。等待后面的数据到达 - // if (buffer.readableBytes() < BASE_LENGTH) { - // log.debug("数据包的长度不满足 readableBytes:{}, BASE_LENGTH:{}", buffer.readableBytes(), BASE_LENGTH); - // return; - // } - // } - // - // // 消息的长度 - // int length = buffer.getUnsignedByte(beginReader + 1); - // // 判断请求数据包数据是否到齐 - // if (buffer.readableBytes() < length + 4) { - // // log.info("请求数据包数据没有到齐,还原读指针 readableBytes:{}, 消息的长度:{}", buffer.readableBytes(), length); - // // 还原读指针 - // buffer.readerIndex(beginReader); - // return; - // } - // - // // 读取data数据 - // byte[] data = new byte[length + 4]; - // buffer.readBytes(data); - // ByteBuf frame = buffer.retainedSlice(beginReader, length + 4); - // buffer.readerIndex(beginReader + length + 4); - // out.add(frame); - // } - protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { // 记录包头开始的index int beginReader; @@ -101,11 +30,6 @@ public class StartAndLengthFieldFrameDecoder extends ByteToMessageDecoder { beginReader = buffer.readerIndex(); buffer.markReaderIndex(); - // 读到了协议的开始标志,结束while循环 - // if (buffer.getUnsignedByte(beginReader) == HEAD_DATA) { - // break; - // } - // 判断是否为DNY包头或68包头 if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) { break; // 读到了协议的开始标志,结束while循环 diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java new file mode 100644 index 000000000..a6b0d5977 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java @@ -0,0 +1,117 @@ +package com.jsowell.netty.decoder; + +import com.jsowell.netty.domain.DnyMessage; +import com.jsowell.netty.domain.Message68; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class StartAndLengthFieldFrameDecoder2 extends ByteToMessageDecoder { + private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 + private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 + private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度,可以根据实际需求调整 + private static final byte[] DNY_HEADER = {'D', 'N', 'Y'}; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + while (buffer.readableBytes() > 0) { + int headerIndex = findHeaderIndex(buffer); + if (headerIndex == -1) { + // 没有找到有效的包头,丢弃所有数据 + buffer.skipBytes(buffer.readableBytes()); + return; + } + + buffer.readerIndex(headerIndex); + + if (isDnyHeader(buffer)) { + if (decodeDnyMessage(buffer, out)) { + return; + } + } else if (is68Header(buffer)) { + if (decode68Message(buffer, out)) { + return; + } + } else { + // 未知协议,跳过一个字节 + buffer.skipBytes(1); + } + } + } + + private int findHeaderIndex(ByteBuf buffer) { + int dnyIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), DNY_HEADER[0]); + int index68 = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), (byte) 0x68); + + if (dnyIndex == -1 && index68 == -1) { + return -1; + } else if (dnyIndex == -1) { + return index68; + } else if (index68 == -1) { + return dnyIndex; + } else { + return Math.min(dnyIndex, index68); + } + } + + private boolean isDnyHeader(ByteBuf buffer) { + return buffer.readableBytes() >= HEADER_LENGTH_DNY && + buffer.getByte(buffer.readerIndex()) == 'D' && + buffer.getByte(buffer.readerIndex() + 1) == 'N' && + buffer.getByte(buffer.readerIndex() + 2) == 'Y'; + } + + private boolean is68Header(ByteBuf buffer) { + return buffer.readableBytes() >= HEADER_LENGTH_68 && + buffer.getByte(buffer.readerIndex()) == 0x68; + } + + private boolean decodeDnyMessage(ByteBuf buffer, List out) { + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2) { + return false; // 数据不足,等待更多数据 + } + + int lengthFieldIndex = buffer.readerIndex() + HEADER_LENGTH_DNY; + int length = buffer.getUnsignedShort(lengthFieldIndex); + + if (length > MAX_FRAME_LENGTH) { + log.warn("DNY frame length exceeds maximum allowed: {}", length); + buffer.skipBytes(HEADER_LENGTH_DNY + 2); + return false; + } + + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2 + length) { + return false; // 数据不足,等待更多数据 + } + + ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_DNY + 2 + length); + out.add(new DnyMessage(frame)); + return true; + } + + private boolean decode68Message(ByteBuf buffer, List out) { + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1) { + return false; // 数据不足,等待更多数据 + } + + int length = buffer.getUnsignedByte(buffer.readerIndex() + HEADER_LENGTH_68); + + if (length > MAX_FRAME_LENGTH) { + log.warn("68 frame length exceeds maximum allowed: {}", length); + buffer.skipBytes(HEADER_LENGTH_68 + 1); + return false; + } + + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length) { + return false; // 数据不足,等待更多数据 + } + + ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_68 + 1 + length); + out.add(new Message68(frame)); + return true; + } +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java index 35f8bd530..0a3f88a28 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java @@ -1,5 +1,8 @@ package com.jsowell.netty.domain; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + public class ChargingPileMessage { private int physicalId; private short messageId; @@ -18,4 +21,15 @@ public class ChargingPileMessage { public short getMessageId() { return messageId; } public byte getCommand() { return command; } public byte[] getData() { return data; } + + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.JSON_STYLE) + .append("physicalId", physicalId) + .append("messageId", messageId) + .append("command", command) + .append("data", data) + .toString(); + } } \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java new file mode 100644 index 000000000..8a030e58f --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java @@ -0,0 +1,15 @@ +package com.jsowell.netty.domain; + +import io.netty.buffer.ByteBuf; + +public class DnyMessage { + private final ByteBuf content; + + public DnyMessage(ByteBuf content) { + this.content = content; + } + + public ByteBuf getContent() { + return content; + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java new file mode 100644 index 000000000..927b8f3e1 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java @@ -0,0 +1,15 @@ +package com.jsowell.netty.domain; + +import io.netty.buffer.ByteBuf; + +public class Message68 { + private final ByteBuf content; + + public Message68(ByteBuf content) { + this.content = content; + } + + public ByteBuf getContent() { + return content; + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java index 70d88c184..6b5c56052 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java @@ -3,21 +3,51 @@ package com.jsowell.netty.server.electricbicycles; import com.jsowell.netty.domain.ChargingPileMessage; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.net.InetSocketAddress; import java.time.Instant; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.concurrent.ConcurrentHashMap; @ChannelHandler.Sharable @Slf4j @Component public class ChargingPileHandler extends ChannelInboundHandlerAdapter { + /** + * 管理一个全局map,保存连接进服务端的通道数量 + */ + private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + + /** + * 有客户端连接服务器会触发此函数 + * 连接被建立并且准备进行通信时被调用 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + int clientPort = insocket.getPort(); + //获取连接通道唯一标识 + ChannelId channelId = ctx.channel().id(); + //如果map中不包含此连接,就保存连接 + if (CHANNEL_MAP.containsKey(channelId)) { + log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size()); + } else { + //保存连接 + CHANNEL_MAP.put(channelId, ctx); + log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size()); + } + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { + log.info("加载客户端报文=== channelId:{}, mag:{}", ctx.channel().id(), msg.toString()); if (!(msg instanceof ChargingPileMessage)) { return; } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java index 3cb096fae..2b19f9dfc 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java @@ -3,6 +3,7 @@ package com.jsowell.netty.server.electricbicycles; import com.jsowell.netty.decoder.ChargingPileDecoder; import com.jsowell.netty.decoder.ProtocolDnyDecoder; import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder; +import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder2; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; @@ -26,12 +27,12 @@ public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); // pipeline.addLast("frameDecoder",new CustomDecoder()); - pipeline.addLast("frameDecoder", new ChargingPileDecoder()); + pipeline.addLast("frameDecoder", new StartAndLengthFieldFrameDecoder()); pipeline.addLast("decoder", new ByteArrayDecoder()); pipeline.addLast("encoder", new ByteArrayDecoder()); //读超时时间设置为10s,0表示不监控 pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); - pipeline.addLast("handler", electricBicyclesServerHandler); + pipeline.addLast("handler", chargingPileHandler); } }