diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java new file mode 100644 index 000000000..7f4146081 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java @@ -0,0 +1,77 @@ +package com.jsowell.netty.decoder; + +import com.jsowell.netty.domain.ChargingPileMessage; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class ChargingPileDecoder extends ByteToMessageDecoder { + + private static final byte[] FRAME_HEADER = {'D', 'N', 'Y'}; // 包头为"DNY" + private static final int MIN_FRAME_LENGTH = 14; // 最小帧长度:包头(3)+长度(2)+物理ID(4)+消息ID(2)+命令(1)+校验(2) + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + while (in.readableBytes() >= MIN_FRAME_LENGTH) { + in.markReaderIndex(); + + // 检查包头 + byte[] header = new byte[3]; + in.readBytes(header); + if (!isValidHeader(header)) { + in.resetReaderIndex(); + return; + } + + // 读取长度 + short length = in.readShort(); + if (in.readableBytes() < length - 5) { // 5 = 包头(3) + 长度(2) + in.resetReaderIndex(); + return; + } + + // 读取物理ID + int physicalId = in.readInt(); + + // 读取消息ID + short messageId = in.readShort(); + + // 读取命令 + byte command = in.readByte(); + + // 读取数据 + int dataLength = length - 13; // 13 = 包头(3) + 长度(2) + 物理ID(4) + 消息ID(2) + 命令(1) + 校验(2) + byte[] data = new byte[dataLength]; + in.readBytes(data); + + // 读取校验和 + short checksum = in.readShort(); + + // 验证校验和 + short calculatedChecksum = calculateChecksum(in, length); + if (checksum != calculatedChecksum) { + System.out.println("校验和错误,丢弃此帧"); + continue; + } + + // 创建消息对象并添加到输出列表 + ChargingPileMessage message = new ChargingPileMessage(physicalId, messageId, command, data); + out.add(message); + } + } + + private boolean isValidHeader(byte[] header) { + return header[0] == FRAME_HEADER[0] && header[1] == FRAME_HEADER[1] && header[2] == FRAME_HEADER[2]; + } + + private short calculateChecksum(ByteBuf buf, int length) { + short sum = 0; + for (int i = 0; i < length - 2; i++) { + sum += buf.getByte(buf.readerIndex() - length + i) & 0xFF; + } + return sum; + } +} + diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java new file mode 100644 index 000000000..35f8bd530 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java @@ -0,0 +1,21 @@ +package com.jsowell.netty.domain; + +public class ChargingPileMessage { + private int physicalId; + private short messageId; + private byte command; + private byte[] data; + + public ChargingPileMessage(int physicalId, short messageId, byte command, byte[] data) { + this.physicalId = physicalId; + this.messageId = messageId; + this.command = command; + this.data = data; + } + + // Getters + public int getPhysicalId() { return physicalId; } + public short getMessageId() { return messageId; } + public byte getCommand() { return command; } + public byte[] getData() { return data; } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java index a12be555b..6b9c8eeda 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java @@ -36,7 +36,7 @@ public class NettyServerManager implements CommandLineRunner { public void run(String... args) throws Exception { startNettyServer(Constants.SOCKET_IP, 9011); startElectricBikeNettyServer(Constants.SOCKET_IP, 9012); - startMqttSever(Constants.SOCKET_IP, 1883); + // startMqttSever(Constants.SOCKET_IP, 1883); } public void startNettyServer(String host, int port) { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java new file mode 100644 index 000000000..70d88c184 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java @@ -0,0 +1,155 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.jsowell.netty.domain.ChargingPileMessage; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.Instant; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +@ChannelHandler.Sharable +@Slf4j +@Component +public class ChargingPileHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (!(msg instanceof ChargingPileMessage)) { + return; + } + + ChargingPileMessage message = (ChargingPileMessage) msg; + byte command = message.getCommand(); + + switch (command) { + case 0x11: + handleHeartbeat(ctx, message); + break; + case 0x12: + handleTimeRequest(ctx, message); + break; + case 0x15: + handleFirmwareUpgradeRequest(ctx, message); + break; + case (byte) 0xFA: + handleFirmwareUpgradeResponse(ctx, message); + break; + case 0x31: + handleReboot(ctx, message); + break; + case 0x32: + handleCommunicationModuleReboot(ctx, message); + break; + case 0x33: + handleClearUpgradeData(ctx, message); + break; + case 0x34: + handleChangeIPAddress(ctx, message); + break; + case 0x35: + handleSubdeviceVersionUpload(ctx, message); + break; + case 0x3B: + handleFSKParameterRequest(ctx, message); + break; + default: + log.info("Unknown command: " + String.format("0x%02X", command)); + } + } + + private void handleHeartbeat(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理心跳包,无需回复 + log.info("Received heartbeat from device ID: " + message.getPhysicalId()); + // 可以在这里更新设备状态、记录日志等 + } + + private void handleTimeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理时间请求 + long currentTime = Instant.now().getEpochSecond(); + byte[] timeBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt((int)currentTime).array(); + + ChargingPileMessage response = new ChargingPileMessage( + message.getPhysicalId(), + message.getMessageId(), + (byte) 0x12, + timeBytes + ); + + ctx.writeAndFlush(response); + } + + private void handleFirmwareUpgradeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理固件升级请求 + // 这里需要实现固件升级的逻辑,包括发送固件数据包等 + log.info("Firmware upgrade requested from device ID: " + message.getPhysicalId()); + // TODO: 实现固件升级逻辑 + } + + private void handleFirmwareUpgradeResponse(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理固件升级响应 + log.info("Firmware upgrade response from device ID: " + message.getPhysicalId()); + // TODO: 根据响应继续发送下一个固件包或结束升级过程 + } + + private void handleReboot(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理重启主机指令 + log.info("Reboot command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x31, (byte) 0x00); + } + + private void handleCommunicationModuleReboot(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理重启通信模块指令 + log.info("Communication module reboot command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x32, (byte) 0x00); + } + + private void handleClearUpgradeData(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理清空升级分机数据指令 + log.info("Clear upgrade data command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x33, (byte) 0x00); + } + + private void handleChangeIPAddress(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理更改IP地址指令 + log.info("Change IP address command received for device ID: " + message.getPhysicalId()); + // TODO: 实现IP地址更改逻辑 + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x34, (byte) 0x00); + } + + private void handleSubdeviceVersionUpload(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理上传分机版本号与设备类型 + log.info("Subdevice version upload received from device ID: " + message.getPhysicalId()); + // TODO: 处理上传的分机版本信息 + // 此命令不需要响应 + } + + private void handleFSKParameterRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理请求服务器FSK主机参数 + log.info("FSK parameter request received from device ID: " + message.getPhysicalId()); + // TODO: 实现发送FSK参数的逻辑(使用0x3A指令) + } + + private void sendSimpleResponse(ChannelHandlerContext ctx, ChargingPileMessage originalMessage, byte command, byte result) { + ChargingPileMessage response = new ChargingPileMessage( + originalMessage.getPhysicalId(), + originalMessage.getMessageId(), + command, + new byte[]{result} + ); + ctx.writeAndFlush(response); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java index 935646b4a..3cb096fae 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java @@ -1,5 +1,6 @@ package com.jsowell.netty.server.electricbicycles; +import com.jsowell.netty.decoder.ChargingPileDecoder; import com.jsowell.netty.decoder.ProtocolDnyDecoder; import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder; import io.netty.channel.ChannelInitializer; @@ -18,11 +19,14 @@ public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer @Resource ElectricBicyclesServerHandler electricBicyclesServerHandler; + @Resource + ChargingPileHandler chargingPileHandler; + @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); // pipeline.addLast("frameDecoder",new CustomDecoder()); - pipeline.addLast("frameDecoder", new ProtocolDnyDecoder()); + pipeline.addLast("frameDecoder", new ChargingPileDecoder()); pipeline.addLast("decoder", new ByteArrayDecoder()); pipeline.addLast("encoder", new ByteArrayDecoder()); //读超时时间设置为10s,0表示不监控