diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java deleted file mode 100644 index f46b481c0..000000000 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.jsowell.netty.decoder; - -import com.jsowell.netty.domain.ChargingPileMessage; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; - -@Slf4j -public class ChargingPileDecoder extends ByteToMessageDecoder { - - private static final byte[] FRAME_HEADER = {'D', 'N', 'Y'}; // 包头为"DNY" - private static final int MIN_FRAME_LENGTH = 14; // 最小帧长度:包头(3)+长度(2)+物理ID(4)+消息ID(2)+命令(1)+校验(2) - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - while (in.readableBytes() >= MIN_FRAME_LENGTH) { - in.markReaderIndex(); - - // 检查包头 - byte[] header = new byte[3]; - in.readBytes(header); - if (!isValidHeader(header)) { - in.resetReaderIndex(); - return; - } - - // 读取长度 - short length = in.readShort(); - if (in.readableBytes() < length - 5) { // 5 = 包头(3) + 长度(2) - in.resetReaderIndex(); - return; - } - - // 读取物理ID - int physicalId = in.readInt(); - log.info("physicalId:{}", physicalId); - - // 读取消息ID - short messageId = in.readShort(); - log.info("messageId:{}", messageId); - - // 读取命令 - byte command = in.readByte(); - log.info("command:{}", command); - - // 读取数据 - int dataLength = length - 13; // 13 = 包头(3) + 长度(2) + 物理ID(4) + 消息ID(2) + 命令(1) + 校验(2) - byte[] data = new byte[dataLength]; - in.readBytes(data); - log.info("data:{}", data); - - // 读取校验和 - short checksum = in.readShort(); - log.info("checksum:{}", checksum); - - // 验证校验和 - short calculatedChecksum = calculateChecksum(in, length); - log.info("calculatedChecksum:{}", calculatedChecksum); - if (checksum != calculatedChecksum) { - log.info("校验和错误,丢弃此帧"); - continue; - } - - // 创建消息对象并添加到输出列表 - ChargingPileMessage message = new ChargingPileMessage(physicalId, messageId, command, data); - log.info("ChargingPileMessage:{}", message.toString()); - out.add(message); - } - } - - private boolean isValidHeader(byte[] header) { - log.info("isValidHeader header:{}", header); - return header[0] == FRAME_HEADER[0] && header[1] == FRAME_HEADER[1] && header[2] == FRAME_HEADER[2]; - } - - private short calculateChecksum(ByteBuf buf, int length) { - log.info("calculateChecksum ByteBuf:{}, length:{}", buf.toString(), length); - short sum = 0; - for (int i = 0; i < length - 2; i++) { - sum += buf.getByte(buf.readerIndex() - length + i) & 0xFF; - } - return sum; - } -} - diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/MessageDecode.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/MessageDecode.java index 9ce5010b7..2f0654eae 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/MessageDecode.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/MessageDecode.java @@ -1,47 +1,77 @@ package com.jsowell.netty.decoder; -import com.jsowell.common.protocol.Message; -import com.jsowell.common.protocol.MessageConstant; -import com.jsowell.common.util.bean.SerializationUtil; +import com.alibaba.fastjson2.JSON; +import com.jsowell.netty.domain.ChargingPileMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; import java.util.List; +@Slf4j public class MessageDecode extends ByteToMessageDecoder { - - @Override - protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - - // 由于数据包的前4个字节用于记录总数据大小,如果数据不够4个字节,不进行读 - if(byteBuf.readableBytes() < 4) { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + log.info("MessageDecode.decode"); + // 检查是否有足够的字节可以读取 + if (in.readableBytes() < 14) { // 最小长度(包头3 + 长度2 + 物理ID4 + 消息ID2 + 命令1 + 校验2) return; } - - // 标记开始读的位置 - byteBuf.markReaderIndex(); - - // 前四个字节记录了数据大小 - int dataSize = byteBuf.readInt(); - - // 查看剩余可读字节是否足够,如果不是,重置读取位置,等待下一次解析 - if(byteBuf.readableBytes() < dataSize) { - byteBuf.resetReaderIndex(); - return; - } - - // 读取消息类型 - byte messageType = byteBuf.readByte(); - // 读取数据, 数组大小需要剔除1个字节的消息类型 - byte[] data = new byte[dataSize -1]; - - byteBuf.readBytes(data); - - Message message = SerializationUtil.deserialize(MessageConstant.getMessageClass(messageType), data); - - list.add(message); + + // 读取所有可读字节 + byte[] bytes = new byte[in.readableBytes()]; + in.readBytes(bytes); + + // 解析字节数组 + ChargingPileMessage message = ChargingPileMessage.parseMessage(bytes); + out.add(message); } + + + // @Override + // protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + // // 检查是否至少有最小长度的字节可以读取 + // if (in.readableBytes() < 14) { // 最小长度(包头3 + 长度2 + 物理ID4 + 消息ID2 + 命令1 + 校验2) + // return; + // } + // + // // 标记读指针位置 + // in.markReaderIndex(); + // + // // 读取包头 + // byte[] headerBytes = new byte[3]; + // in.readBytes(headerBytes); + // String header = new String(headerBytes); + // + // // 读取长度 + // int length = in.readUnsignedShort(); + // + // // 检查剩余字节数是否足够读取整个包 + // if (in.readableBytes() < length - 5) { // 减去包头和长度本身占的字节数 + // in.resetReaderIndex(); + // return; + // } + // + // // 读取物理ID + // int physicalId = in.readInt(); + // + // // 读取消息ID + // int messageId = in.readUnsignedShort(); + // + // // 读取命令 + // byte command = in.readByte(); + // + // // 读取数据 + // byte[] data = new byte[length - 12]; // 减去固定字段的字节数 + // in.readBytes(data); + // + // // 读取校验 + // int checksum = in.readUnsignedShort(); + // + // // 创建ChargingPileMessage对象并添加到输出列表 + // ChargingPileMessage message = new ChargingPileMessage(header, length, physicalId, messageId, command, data, checksum); + // out.add(message); + // } } \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java index 9300fb909..a99f10481 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java @@ -17,6 +17,7 @@ public class StartAndLengthFieldFrameDecoder extends ByteToMessageDecoder { public StartAndLengthFieldFrameDecoder() {} protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + log.info("StartAndLengthFieldFrameDecoder.decode"); // 记录包头开始的index int beginReader; diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java index 0a3f88a28..14c610a19 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java @@ -1,35 +1,40 @@ package com.jsowell.netty.domain; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; +import com.jsowell.common.util.BytesUtil; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import java.util.Arrays; + +@Slf4j +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder public class ChargingPileMessage { - private int physicalId; - private short messageId; - private byte command; - private byte[] data; + private String header; // 包头 (3字节) + private int length; // 长度 (2字节) + private int physicalId; // 物理ID (4字节) + private int messageId; // 消息ID (2字节) + private byte command; // 命令 (1字节) + private byte[] data; // 数据 (n字节) + private int checksum; // 校验 (2字节) - public ChargingPileMessage(int physicalId, short messageId, byte command, byte[] data) { - this.physicalId = physicalId; - this.messageId = messageId; - this.command = command; - this.data = data; + // 从字节数组解析消息 + public static ChargingPileMessage parseMessage(byte[] messageBytes) { + log.info("parseMessage:{}", BytesUtil.binary(messageBytes, 16)); + String header = new String(Arrays.copyOfRange(messageBytes, 0, 3)); + int length = ((messageBytes[3] & 0xFF) << 8) | (messageBytes[4] & 0xFF); + int physicalId = ((messageBytes[5] & 0xFF) << 24) | ((messageBytes[6] & 0xFF) << 16) | + ((messageBytes[7] & 0xFF) << 8) | (messageBytes[8] & 0xFF); + int messageId = ((messageBytes[9] & 0xFF) << 8) | (messageBytes[10] & 0xFF); + byte command = messageBytes[11]; + byte[] data = Arrays.copyOfRange(messageBytes, 12, messageBytes.length - 2); + int checksum = ((messageBytes[messageBytes.length - 2] & 0xFF) << 8) | (messageBytes[messageBytes.length - 1] & 0xFF); + + return new ChargingPileMessage(header, length, physicalId, messageId, command, data, checksum); } - - // Getters - public int getPhysicalId() { return physicalId; } - public short getMessageId() { return messageId; } - public byte getCommand() { return command; } - public byte[] getData() { return data; } - - - @Override - public String toString() { - return new ToStringBuilder(this, ToStringStyle.JSON_STYLE) - .append("physicalId", physicalId) - .append("messageId", messageId) - .append("command", command) - .append("data", data) - .toString(); - } -} \ No newline at end of file +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java index 6c57c7b71..82632c374 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java @@ -102,17 +102,17 @@ public class ChargingPileHandler extends ChannelInboundHandlerAdapter { private void handleTimeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { // 处理时间请求 - long currentTime = Instant.now().getEpochSecond(); - byte[] timeBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt((int)currentTime).array(); - - ChargingPileMessage response = new ChargingPileMessage( - message.getPhysicalId(), - message.getMessageId(), - (byte) 0x12, - timeBytes - ); - - ctx.writeAndFlush(response); + // long currentTime = Instant.now().getEpochSecond(); + // byte[] timeBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt((int)currentTime).array(); + // + // ChargingPileMessage response = new ChargingPileMessage( + // message.getPhysicalId(), + // message.getMessageId(), + // (byte) 0x12, + // timeBytes + // ); + // + // ctx.writeAndFlush(response); } private void handleFirmwareUpgradeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { @@ -171,13 +171,13 @@ public class ChargingPileHandler extends ChannelInboundHandlerAdapter { } private void sendSimpleResponse(ChannelHandlerContext ctx, ChargingPileMessage originalMessage, byte command, byte result) { - ChargingPileMessage response = new ChargingPileMessage( - originalMessage.getPhysicalId(), - originalMessage.getMessageId(), - command, - new byte[]{result} - ); - ctx.writeAndFlush(response); + // ChargingPileMessage response = new ChargingPileMessage( + // originalMessage.getPhysicalId(), + // originalMessage.getMessageId(), + // command, + // new byte[]{result} + // ); + // ctx.writeAndFlush(response); } @Override diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java index 0736646a0..9f4d35f63 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java @@ -1,12 +1,11 @@ package com.jsowell.netty.server.electricbicycles; -import com.jsowell.netty.decoder.ChargingPileDecoder; import com.jsowell.netty.decoder.MessageDecode; +import com.jsowell.netty.decoder.MessageEncode; import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.stereotype.Component; @@ -27,7 +26,7 @@ public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("frameDecoder", new StartAndLengthFieldFrameDecoder()); pipeline.addLast("decoder", new MessageDecode()); - pipeline.addLast("encoder", new MessageDecode()); + pipeline.addLast("encoder", new MessageEncode()); //读超时时间设置为10s,0表示不监控 pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); pipeline.addLast("handler", chargingPileHandler);