diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java new file mode 100644 index 000000000..f46b481c0 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java @@ -0,0 +1,88 @@ +package com.jsowell.netty.decoder; + +import com.jsowell.netty.domain.ChargingPileMessage; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class ChargingPileDecoder extends ByteToMessageDecoder { + + private static final byte[] FRAME_HEADER = {'D', 'N', 'Y'}; // 包头为"DNY" + private static final int MIN_FRAME_LENGTH = 14; // 最小帧长度:包头(3)+长度(2)+物理ID(4)+消息ID(2)+命令(1)+校验(2) + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + while (in.readableBytes() >= MIN_FRAME_LENGTH) { + in.markReaderIndex(); + + // 检查包头 + byte[] header = new byte[3]; + in.readBytes(header); + if (!isValidHeader(header)) { + in.resetReaderIndex(); + return; + } + + // 读取长度 + short length = in.readShort(); + if (in.readableBytes() < length - 5) { // 5 = 包头(3) + 长度(2) + in.resetReaderIndex(); + return; + } + + // 读取物理ID + int physicalId = in.readInt(); + log.info("physicalId:{}", physicalId); + + // 读取消息ID + short messageId = in.readShort(); + log.info("messageId:{}", messageId); + + // 读取命令 + byte command = in.readByte(); + log.info("command:{}", command); + + // 读取数据 + int dataLength = length - 13; // 13 = 包头(3) + 长度(2) + 物理ID(4) + 消息ID(2) + 命令(1) + 校验(2) + byte[] data = new byte[dataLength]; + in.readBytes(data); + log.info("data:{}", data); + + // 读取校验和 + short checksum = in.readShort(); + log.info("checksum:{}", checksum); + + // 验证校验和 + short calculatedChecksum = calculateChecksum(in, length); + log.info("calculatedChecksum:{}", calculatedChecksum); + if (checksum != calculatedChecksum) { + log.info("校验和错误,丢弃此帧"); + continue; + } + + // 创建消息对象并添加到输出列表 + ChargingPileMessage message = new ChargingPileMessage(physicalId, messageId, command, data); + log.info("ChargingPileMessage:{}", message.toString()); + out.add(message); + } + } + + private boolean isValidHeader(byte[] header) { + log.info("isValidHeader header:{}", header); + return header[0] == FRAME_HEADER[0] && header[1] == FRAME_HEADER[1] && header[2] == FRAME_HEADER[2]; + } + + private short calculateChecksum(ByteBuf buf, int length) { + log.info("calculateChecksum ByteBuf:{}, length:{}", buf.toString(), length); + short sum = 0; + for (int i = 0; i < length - 2; i++) { + sum += buf.getByte(buf.readerIndex() - length + i) & 0xFF; + } + return sum; + } +} + diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ProtocolDnyDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ProtocolDnyDecoder.java new file mode 100644 index 000000000..0c627aab7 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ProtocolDnyDecoder.java @@ -0,0 +1,117 @@ +package com.jsowell.netty.decoder; + + +import com.jsowell.netty.domain.ProtocolDnyMessage; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.ByteOrder; +import java.util.List; + +@Slf4j +public class ProtocolDnyDecoder extends ByteToMessageDecoder { + private static final int HEADER_LENGTH = 3; // 包头长度 + private static final int LENGTH_FIELD_LENGTH = 2; // 长度字段长度 + private static final int PHYSICAL_ID_LENGTH = 4; // 物理ID长度 + private static final int MESSAGE_ID_LENGTH = 2; // 消息ID长度 + private static final int COMMAND_LENGTH = 1; // 命令长度 + private static final int CHECKSUM_LENGTH = 2; // 校验字段长度 + + private static final int MIN_FRAME_LENGTH = HEADER_LENGTH + LENGTH_FIELD_LENGTH + PHYSICAL_ID_LENGTH + MESSAGE_ID_LENGTH + COMMAND_LENGTH + CHECKSUM_LENGTH; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + // 检查是否有足够的数据来读取最小帧长度 + if (in.readableBytes() < MIN_FRAME_LENGTH) { + return; + } + + // 标记当前读位置 + in.markReaderIndex(); + + // 检查包头 + byte[] header = new byte[HEADER_LENGTH]; + in.readBytes(header); + if (header[0] != 'D' || header[1] != 'N' || header[2] != 'Y') { + in.resetReaderIndex(); + throw new IllegalStateException("Invalid start bytes: " + new String(header)); + } + + // 读取长度字段 + in.order(ByteOrder.LITTLE_ENDIAN); // 确保使用小端模式 + int length = in.readUnsignedShort(); + if (in.readableBytes() < length) { + in.resetReaderIndex(); + return; + } + + // 读取物理ID + byte[] physicalId = new byte[PHYSICAL_ID_LENGTH]; + in.readBytes(physicalId); + + // 读取消息ID + int messageId = in.readUnsignedShort(); + + // 读取命令 + byte command = in.readByte(); + + // 读取数据 + int dataLength = length - (PHYSICAL_ID_LENGTH + MESSAGE_ID_LENGTH + COMMAND_LENGTH + CHECKSUM_LENGTH); + byte[] data = new byte[dataLength]; + in.readBytes(data); + + // 读取校验字段 + int checksum = in.readUnsignedShort(); + + // 计算校验值并验证 + if (!verifyChecksum(header, length, physicalId, messageId, command, data, checksum)) { + in.resetReaderIndex(); + throw new IllegalStateException("Invalid checksum"); + } + + // 创建协议消息对象 + ProtocolDnyMessage message = new ProtocolDnyMessage(header, length, physicalId, messageId, command, data, checksum); + + // 将解码后的消息添加到输出列表中 + out.add(message); + } + + // 校验帧校验域的函数 + private boolean verifyChecksum(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data, int checksum) { + // 这里需要实现校验逻辑,假设我们有一个累加和校验函数sumCheck + int calculatedChecksum = sumCheck(header, length, physicalId, messageId, command, data); + return calculatedChecksum == checksum; + } + + // 累加和校验函数,计算从包头到数据的累加和 + private int sumCheck(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data) { + int sum = 0; + + for (byte b : header) { + sum += b & 0xFF; + } + + sum += length & 0xFF; + sum += (length >> 8) & 0xFF; + + for (byte b : physicalId) { + sum += b & 0xFF; + } + + sum += messageId & 0xFF; + sum += (messageId >> 8) & 0xFF; + + sum += command & 0xFF; + + for (byte b : data) { + sum += b & 0xFF; + } + + return sum & 0xFFFF; // 返回16位结果 + } +} + + + diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java index b32b9b6ff..7a6fffb7d 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java @@ -5,77 +5,127 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; +import java.nio.charset.StandardCharsets; import java.util.List; @Slf4j public class StartAndLengthFieldFrameDecoder extends ByteToMessageDecoder { - // 起始标志 - private int HEAD_DATA; + private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 + private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 - public StartAndLengthFieldFrameDecoder(int HEAD_DATA) { - this.HEAD_DATA = HEAD_DATA; - } + // 构造函数,初始化起始标志 + public StartAndLengthFieldFrameDecoder() {} - /** - *
-	 * 协议开始的标准head_data,int类型,占据1个字节.
-	 * 表示数据的长度contentLength,int类型,占据1个字节.
-	 * 
- */ - public final int BASE_LENGTH = 1 + 1; - - @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { - // 可读长度必须大于基本长度 - if (buffer.readableBytes() <= BASE_LENGTH) { - log.warn("可读字节数:{}小于基础长度:{}", buffer.readableBytes(), BASE_LENGTH); - return; - } - // 记录包头开始的index int beginReader; + // 循环查找包头 while (true) { + if (buffer.readableBytes() < Math.min(HEADER_LENGTH_DNY, HEADER_LENGTH_68)) { + return; // 数据长度不足,等待更多数据 + } + // 获取包头开始的index beginReader = buffer.readerIndex(); - // log.info("包头开始的index:{}", beginReader); - // 标记包头开始的index buffer.markReaderIndex(); - // 读到了协议的开始标志,结束while循环 - if (buffer.getUnsignedByte(beginReader) == HEAD_DATA) { - // log.info("读到了协议的开始标志,结束while循环 byte:{}, HEAD_DATA:{}", buffer.getUnsignedByte(beginReader), HEAD_DATA); - break; + + // 判断是否为DNY包头或68包头 + if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) { + break; // 读到了协议的开始标志,结束while循环 } // 未读到包头,略过一个字节 - // 每次略过,一个字节,去读取,包头信息的开始标记 buffer.resetReaderIndex(); buffer.readByte(); + } - // 当略过,一个字节之后, - // 数据包的长度,又变得不满足 - // 此时,应该结束。等待后面的数据到达 - if (buffer.readableBytes() < BASE_LENGTH) { - log.debug("数据包的长度不满足 readableBytes:{}, BASE_LENGTH:{}", buffer.readableBytes(), BASE_LENGTH); + // 检查包头是否是 "DNY" + if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + String header = new String(headerBytes, StandardCharsets.UTF_8); + + if ("DNY".equals(header)) { + // 处理 DNY 协议 + decodeDnyMessage(buffer, out, beginReader); return; } } - // 消息的长度 - int length = buffer.getUnsignedByte(beginReader + 1); - // 判断请求数据包数据是否到齐 - if (buffer.readableBytes() < length + 4) { - // log.info("请求数据包数据没有到齐,还原读指针 readableBytes:{}, 消息的长度:{}", buffer.readableBytes(), length); - // 还原读指针 + // 检查包头是否是 68 协议 + if (buffer.readableBytes() >= HEADER_LENGTH_68) { + if (buffer.getUnsignedByte(beginReader) == 0x68) { + // 处理 68 协议 + decode68Message(buffer, out, beginReader); + return; + } + } + + // 未知协议,还原读指针 + buffer.resetReaderIndex(); + } + + // 判断是否为DNY包头 + private boolean isStartOfDnyHeader(ByteBuf buffer, int beginReader) { + if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + String header = new String(headerBytes, StandardCharsets.UTF_8); + return "DNY".equals(header); + } + return false; + } + + // 判断是否为68包头 + private boolean isStartOf68Header(ByteBuf buffer, int beginReader) { + if (buffer.readableBytes() >= HEADER_LENGTH_68) { + return buffer.getUnsignedByte(beginReader) == 0x68; + } + return false; + } + + // 处理68协议消息 + private void decode68Message(ByteBuf buffer, List out, int beginReader) { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1) { buffer.readerIndex(beginReader); return; } - // 读取data数据 - byte[] data = new byte[length + 4]; - buffer.readBytes(data); - ByteBuf frame = buffer.retainedSlice(beginReader, length + 4); - buffer.readerIndex(beginReader + length + 4); + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 + ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length); + buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length); + out.add(frame); + } + + // 处理DNY协议消息 + private void decodeDnyMessage(ByteBuf buffer, List out, int beginReader) { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { + buffer.readerIndex(beginReader); + return; + } + + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 + ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + 1 + length); + buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + 1 + length); out.add(frame); } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java new file mode 100644 index 000000000..a6b0d5977 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java @@ -0,0 +1,117 @@ +package com.jsowell.netty.decoder; + +import com.jsowell.netty.domain.DnyMessage; +import com.jsowell.netty.domain.Message68; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class StartAndLengthFieldFrameDecoder2 extends ByteToMessageDecoder { + private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 + private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 + private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度,可以根据实际需求调整 + private static final byte[] DNY_HEADER = {'D', 'N', 'Y'}; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + while (buffer.readableBytes() > 0) { + int headerIndex = findHeaderIndex(buffer); + if (headerIndex == -1) { + // 没有找到有效的包头,丢弃所有数据 + buffer.skipBytes(buffer.readableBytes()); + return; + } + + buffer.readerIndex(headerIndex); + + if (isDnyHeader(buffer)) { + if (decodeDnyMessage(buffer, out)) { + return; + } + } else if (is68Header(buffer)) { + if (decode68Message(buffer, out)) { + return; + } + } else { + // 未知协议,跳过一个字节 + buffer.skipBytes(1); + } + } + } + + private int findHeaderIndex(ByteBuf buffer) { + int dnyIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), DNY_HEADER[0]); + int index68 = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), (byte) 0x68); + + if (dnyIndex == -1 && index68 == -1) { + return -1; + } else if (dnyIndex == -1) { + return index68; + } else if (index68 == -1) { + return dnyIndex; + } else { + return Math.min(dnyIndex, index68); + } + } + + private boolean isDnyHeader(ByteBuf buffer) { + return buffer.readableBytes() >= HEADER_LENGTH_DNY && + buffer.getByte(buffer.readerIndex()) == 'D' && + buffer.getByte(buffer.readerIndex() + 1) == 'N' && + buffer.getByte(buffer.readerIndex() + 2) == 'Y'; + } + + private boolean is68Header(ByteBuf buffer) { + return buffer.readableBytes() >= HEADER_LENGTH_68 && + buffer.getByte(buffer.readerIndex()) == 0x68; + } + + private boolean decodeDnyMessage(ByteBuf buffer, List out) { + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2) { + return false; // 数据不足,等待更多数据 + } + + int lengthFieldIndex = buffer.readerIndex() + HEADER_LENGTH_DNY; + int length = buffer.getUnsignedShort(lengthFieldIndex); + + if (length > MAX_FRAME_LENGTH) { + log.warn("DNY frame length exceeds maximum allowed: {}", length); + buffer.skipBytes(HEADER_LENGTH_DNY + 2); + return false; + } + + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2 + length) { + return false; // 数据不足,等待更多数据 + } + + ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_DNY + 2 + length); + out.add(new DnyMessage(frame)); + return true; + } + + private boolean decode68Message(ByteBuf buffer, List out) { + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1) { + return false; // 数据不足,等待更多数据 + } + + int length = buffer.getUnsignedByte(buffer.readerIndex() + HEADER_LENGTH_68); + + if (length > MAX_FRAME_LENGTH) { + log.warn("68 frame length exceeds maximum allowed: {}", length); + buffer.skipBytes(HEADER_LENGTH_68 + 1); + return false; + } + + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length) { + return false; // 数据不足,等待更多数据 + } + + ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_68 + 1 + length); + out.add(new Message68(frame)); + return true; + } +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java new file mode 100644 index 000000000..0a3f88a28 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java @@ -0,0 +1,35 @@ +package com.jsowell.netty.domain; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class ChargingPileMessage { + private int physicalId; + private short messageId; + private byte command; + private byte[] data; + + public ChargingPileMessage(int physicalId, short messageId, byte command, byte[] data) { + this.physicalId = physicalId; + this.messageId = messageId; + this.command = command; + this.data = data; + } + + // Getters + public int getPhysicalId() { return physicalId; } + public short getMessageId() { return messageId; } + public byte getCommand() { return command; } + public byte[] getData() { return data; } + + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.JSON_STYLE) + .append("physicalId", physicalId) + .append("messageId", messageId) + .append("command", command) + .append("data", data) + .toString(); + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java new file mode 100644 index 000000000..8a030e58f --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java @@ -0,0 +1,15 @@ +package com.jsowell.netty.domain; + +import io.netty.buffer.ByteBuf; + +public class DnyMessage { + private final ByteBuf content; + + public DnyMessage(ByteBuf content) { + this.content = content; + } + + public ByteBuf getContent() { + return content; + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java new file mode 100644 index 000000000..927b8f3e1 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java @@ -0,0 +1,15 @@ +package com.jsowell.netty.domain; + +import io.netty.buffer.ByteBuf; + +public class Message68 { + private final ByteBuf content; + + public Message68(ByteBuf content) { + this.content = content; + } + + public ByteBuf getContent() { + return content; + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ProtocolDnyMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ProtocolDnyMessage.java new file mode 100644 index 000000000..76ee060df --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ProtocolDnyMessage.java @@ -0,0 +1,49 @@ +package com.jsowell.netty.domain; + +public class ProtocolDnyMessage { + private final byte[] header; + private final int length; + private final byte[] physicalId; + private final int messageId; + private final byte command; + private final byte[] data; + private final int checksum; + + public ProtocolDnyMessage(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data, int checksum) { + this.header = header; + this.length = length; + this.physicalId = physicalId; + this.messageId = messageId; + this.command = command; + this.data = data; + this.checksum = checksum; + } + + public byte[] getHeader() { + return header; + } + + public int getLength() { + return length; + } + + public byte[] getPhysicalId() { + return physicalId; + } + + public int getMessageId() { + return messageId; + } + + public byte getCommand() { + return command; + } + + public byte[] getData() { + return data; + } + + public int getChecksum() { + return checksum; + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java new file mode 100644 index 000000000..6b9c8eeda --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java @@ -0,0 +1,152 @@ +package com.jsowell.netty.server; + +import com.jsowell.common.constant.Constants; +import com.jsowell.netty.server.electricbicycles.ElectricBicyclesServerChannelInitializer; +import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler; +import com.jsowell.netty.server.yunkuaichong.NettyServerChannelInitializer; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.net.InetSocketAddress; + +@Slf4j +@Component +public class NettyServerManager implements CommandLineRunner { + + @Resource + private NettyServerChannelInitializer nettyServerChannelInitializer; + + @Resource + private ElectricBicyclesServerChannelInitializer electricBicyclesServerChannelInitializer; + + @Override + public void run(String... args) throws Exception { + startNettyServer(Constants.SOCKET_IP, 9011); + startElectricBikeNettyServer(Constants.SOCKET_IP, 9012); + // startMqttSever(Constants.SOCKET_IP, 1883); + } + + public void startNettyServer(String host, int port) { + new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childHandler(nettyServerChannelInitializer) + .localAddress(new InetSocketAddress(host, port)); + + ChannelFuture future = bootstrap.bind(port).sync(); + if (future.isSuccess()) { + log.info("NettyServer启动成功, 开始监听端口:{}", port); + } else { + log.error("NettyServer启动失败", future.cause()); + } + + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("NettyServer.start error", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + }).start(); + } + + public void startElectricBikeNettyServer(String host, int port) { + new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childHandler(electricBicyclesServerChannelInitializer) + .localAddress(new InetSocketAddress(host, port)); + + ChannelFuture future = bootstrap.bind(port).sync(); + if (future.isSuccess()) { + log.info("ElectricBikeNettyServer启动成功, 开始监听端口:{}", port); + } else { + log.error("ElectricBikeNettyServer启动失败", future.cause()); + } + + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("ElectricBikeNettyServer.start error", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + }).start(); + } + + public void startMqttSever(String host, int port) { + new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap mqttBootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .localAddress(new InetSocketAddress(host, port)); + + mqttBootstrap.childHandler(new ChannelInitializer() { + protected void initChannel(SocketChannel ch) { + ChannelPipeline channelPipeline = ch.pipeline(); + // 设置读写空闲超时时间 + channelPipeline.addLast(new IdleStateHandler(600, 600, 1200)); + channelPipeline.addLast("encoder", MqttEncoder.INSTANCE); + channelPipeline.addLast("decoder", new MqttDecoder()); + channelPipeline.addLast(new BootNettyMqttChannelInboundHandler()); + } + }); + + ChannelFuture future = mqttBootstrap.bind(port).sync(); + if (future.isSuccess()) { + log.info("MqttServer启动成功, 开始监听端口:{}", port); + } else { + log.error("MqttServer启动失败", future.cause()); + } + + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("MqttServer.start error", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + }).start(); + } +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java new file mode 100644 index 000000000..7339de6fa --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java @@ -0,0 +1,185 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.jsowell.netty.domain.ChargingPileMessage; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.time.Instant; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.ConcurrentHashMap; + +@ChannelHandler.Sharable +@Slf4j +@Component +public class ChargingPileHandler extends ChannelInboundHandlerAdapter { + + /** + * 管理一个全局map,保存连接进服务端的通道数量 + */ + private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + + /** + * 有客户端连接服务器会触发此函数 + * 连接被建立并且准备进行通信时被调用 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + int clientPort = insocket.getPort(); + //获取连接通道唯一标识 + ChannelId channelId = ctx.channel().id(); + //如果map中不包含此连接,就保存连接 + if (CHANNEL_MAP.containsKey(channelId)) { + log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size()); + } else { + //保存连接 + CHANNEL_MAP.put(channelId, ctx); + log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size()); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + log.info("加载客户端报文=== channelId:{}, mag:{}", ctx.channel().id(), msg.toString()); + // if (!(msg instanceof ChargingPileMessage)) { + // return; + // } + + ChargingPileMessage message = (ChargingPileMessage) msg; + byte command = message.getCommand(); + + switch (command) { + case 0x11: + handleHeartbeat(ctx, message); + break; + case 0x12: + handleTimeRequest(ctx, message); + break; + case 0x15: + handleFirmwareUpgradeRequest(ctx, message); + break; + case (byte) 0xFA: + handleFirmwareUpgradeResponse(ctx, message); + break; + case 0x31: + handleReboot(ctx, message); + break; + case 0x32: + handleCommunicationModuleReboot(ctx, message); + break; + case 0x33: + handleClearUpgradeData(ctx, message); + break; + case 0x34: + handleChangeIPAddress(ctx, message); + break; + case 0x35: + handleSubdeviceVersionUpload(ctx, message); + break; + case 0x3B: + handleFSKParameterRequest(ctx, message); + break; + default: + log.info("Unknown command: " + String.format("0x%02X", command)); + } + } + + private void handleHeartbeat(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理心跳包,无需回复 + log.info("Received heartbeat from device ID: " + message.getPhysicalId()); + // 可以在这里更新设备状态、记录日志等 + } + + private void handleTimeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理时间请求 + long currentTime = Instant.now().getEpochSecond(); + byte[] timeBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt((int)currentTime).array(); + + ChargingPileMessage response = new ChargingPileMessage( + message.getPhysicalId(), + message.getMessageId(), + (byte) 0x12, + timeBytes + ); + + ctx.writeAndFlush(response); + } + + private void handleFirmwareUpgradeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理固件升级请求 + // 这里需要实现固件升级的逻辑,包括发送固件数据包等 + log.info("Firmware upgrade requested from device ID: " + message.getPhysicalId()); + // TODO: 实现固件升级逻辑 + } + + private void handleFirmwareUpgradeResponse(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理固件升级响应 + log.info("Firmware upgrade response from device ID: " + message.getPhysicalId()); + // TODO: 根据响应继续发送下一个固件包或结束升级过程 + } + + private void handleReboot(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理重启主机指令 + log.info("Reboot command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x31, (byte) 0x00); + } + + private void handleCommunicationModuleReboot(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理重启通信模块指令 + log.info("Communication module reboot command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x32, (byte) 0x00); + } + + private void handleClearUpgradeData(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理清空升级分机数据指令 + log.info("Clear upgrade data command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x33, (byte) 0x00); + } + + private void handleChangeIPAddress(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理更改IP地址指令 + log.info("Change IP address command received for device ID: " + message.getPhysicalId()); + // TODO: 实现IP地址更改逻辑 + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x34, (byte) 0x00); + } + + private void handleSubdeviceVersionUpload(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理上传分机版本号与设备类型 + log.info("Subdevice version upload received from device ID: " + message.getPhysicalId()); + // TODO: 处理上传的分机版本信息 + // 此命令不需要响应 + } + + private void handleFSKParameterRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理请求服务器FSK主机参数 + log.info("FSK parameter request received from device ID: " + message.getPhysicalId()); + // TODO: 实现发送FSK参数的逻辑(使用0x3A指令) + } + + private void sendSimpleResponse(ChannelHandlerContext ctx, ChargingPileMessage originalMessage, byte command, byte result) { + ChargingPileMessage response = new ChargingPileMessage( + originalMessage.getPhysicalId(), + originalMessage.getMessageId(), + command, + new byte[]{result} + ); + ctx.writeAndFlush(response); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java new file mode 100644 index 000000000..fd7b91cf7 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java @@ -0,0 +1,37 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.jsowell.netty.decoder.ChargingPileDecoder; +import com.jsowell.netty.decoder.ProtocolDnyDecoder; +import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder; +import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder2; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.bytes.ByteArrayDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; + +@Component +public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer { + + @Resource + ElectricBicyclesServerHandler electricBicyclesServerHandler; + + @Resource + ChargingPileHandler chargingPileHandler; + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("frameDecoder", new ChargingPileDecoder()); + pipeline.addLast("decoder", new ByteArrayDecoder()); + pipeline.addLast("encoder", new ByteArrayDecoder()); + //读超时时间设置为10s,0表示不监控 + pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); + pipeline.addLast("handler", chargingPileHandler); + } + +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java new file mode 100644 index 000000000..72f99012f --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java @@ -0,0 +1,234 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.google.common.collect.Lists; +import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; +import com.jsowell.common.enums.ykc.PileChannelEntity; +import com.jsowell.common.util.BytesUtil; +import com.jsowell.common.util.StringUtils; +import com.jsowell.common.util.YKCUtils; +import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.ReadTimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * netty服务端处理类 + */ +@ChannelHandler.Sharable +@Slf4j +@Component +public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter { + + @Autowired + private YKCBusinessService ykcService; + + /** + * 管理一个全局map,保存连接进服务端的通道数量 + */ + private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + + private final List notPrintFrameTypeList = Lists.newArrayList("0x03"); + + /** + * 有客户端连接服务器会触发此函数 + * 连接被建立并且准备进行通信时被调用 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + int clientPort = insocket.getPort(); + //获取连接通道唯一标识 + ChannelId channelId = ctx.channel().id(); + //如果map中不包含此连接,就保存连接 + if (CHANNEL_MAP.containsKey(channelId)) { + log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size()); + } else { + //保存连接 + CHANNEL_MAP.put(channelId, ctx); + log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size()); + } + } + + /** + * 有客户端发消息会触发此函数 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { + // log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg); + // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 + byte[] msg = (byte[]) message; + + // 获取帧类型 + byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1); + String frameType = YKCUtils.frameType2Str(frameTypeBytes); + // 获取序列号域 + int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2)); + // 获取channel + Channel channel = ctx.channel(); + + // new + // String hexString = DatatypeConverter.printHexBinary(msg); + + // 心跳包0x03日志太多,造成日志文件过大,改为不打印 + if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}", + // channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + // BytesUtil.binary(msg, 16), hexString); + log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", + channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + BytesUtil.binary(msg, 16)); + } + + // 处理数据 + byte[] response = ykcService.process(msg, channel); + if (Objects.nonNull(response)) { + // 响应客户端 + ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); + this.channelWrite(channel.id(), buffer); + if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // 应答帧类型 + byte[] responseFrameTypeBytes = YKCFrameTypeCode.ResponseRelation.getResponseFrameTypeBytes(frameTypeBytes); + String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes); + log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", + channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), + frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + BytesUtil.binary(response, 16)); + } + } + } + + /** + * 有客户端终止连接服务器会触发此函数 + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + ChannelId channelId = ctx.channel().id(); + //包含此客户端才去删除 + if (CHANNEL_MAP.containsKey(channelId)) { + ykcService.exit(channelId); + //删除连接 + CHANNEL_MAP.remove(channelId); + log.info("客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size()); + } + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) + // Channel incoming = ctx.channel(); + // log.info("handlerAdded: handler被添加到channel的pipeline connect:" + incoming.remoteAddress()); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) + // Channel incoming = ctx.channel(); + // log.info("handlerRemoved: handler从channel的pipeline中移除 connect:" + incoming.remoteAddress()); + // ChannelMapByEntity.removeChannel(incoming); + // ChannelMap.removeChannel(incoming); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + // log.info("channel:【{}】读数据完成", channel.id()); + super.channelReadComplete(ctx); + } + + /** + * 服务端给客户端发送消息 + * + * @param channelId 连接通道唯一id + * @param msg 需要发送的消息内容 + */ + public void channelWrite(ChannelId channelId, Object msg) throws Exception { + ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); + if (ctx == null) { + log.info("通道【{}】不存在", channelId); + return; + } + if (msg == null || msg == "") { + log.info("服务端响应空的消息"); + return; + } + //将客户端的信息直接返回写入ctx + ctx.write(msg); + //刷新缓存区 + ctx.flush(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + String socketString = ctx.channel().remoteAddress().toString(); + ChannelId channelId = ctx.channel().id(); + String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText()); + if (evt instanceof IdleStateEvent) { // 超时事件 + IdleStateEvent event = (IdleStateEvent) evt; + boolean flag = false; + if (event.state() == IdleState.READER_IDLE) { // 读 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn); + } else if (event.state() == IdleState.WRITER_IDLE) { // 写 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn); + } else if (event.state() == IdleState.ALL_IDLE) { // 全部 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn); + } + if (flag) { + ctx.channel().close(); + // close(channelId, pileSn); + } + } + } + + /** + * 发生异常会触发此函数 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ChannelId channelId = ctx.channel().id(); + String channelIdShortText = channelId.asShortText(); + String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText); + log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause); + cause.printStackTrace(); + // 如果桩连到平台,在1分钟内没有发送数据过来,会报ReadTimeoutException异常 + if (cause instanceof ReadTimeoutException) { + if (log.isTraceEnabled()) { + log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress()); + } + log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size()); + ctx.channel().close(); + } + // close(channelId, pileSn); + } + + + // 公共方法 关闭连接 + private void closeConnection(String pileSn, ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + ChannelId channelId = channel.id(); + log.error("close方法-发生异常,关闭链接,channelId:{}, pileSn:{}", channelId.asShortText(), pileSn); + if (channel != null && !channel.isActive() && !channel.isOpen() && !channel.isWritable()) { + channel.close(); + // 删除连接 + CHANNEL_MAP.remove(channelId); + } + // 删除桩编号和channel的关系 + if (StringUtils.isNotBlank(pileSn)) { + PileChannelEntity.removeByPileSn(pileSn); + } + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java index 09649c12d..59e8395e0 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java @@ -19,8 +19,9 @@ import org.springframework.stereotype.Component; import java.net.InetSocketAddress; @Slf4j -@Component -@Order(5) +@Deprecated +// @Component +// @Order(7) public class MqttSever implements CommandLineRunner { @Override diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java index 4ff2a2e5e..d1eb8d88b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java @@ -16,8 +16,9 @@ import javax.annotation.Resource; import java.net.InetSocketAddress; @Slf4j -@Component -@Order(2) +@Deprecated +// @Component +// @Order(3) public class NettyServer implements CommandLineRunner { @Resource private NettyServerChannelInitializer nettyServerChannelInitializer; diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java index 8551a6229..b343fde1b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java @@ -21,7 +21,7 @@ public class NettyServerChannelInitializer extends ChannelInitializer>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 InitializeAdapayConfig order 1 <<<<<<<<<<<<<"); /* 单商户 */ @@ -47,6 +46,7 @@ public class InitializeAdapayConfig implements CommandLineRunner { 目前有jsowell和xixiao */ multiMerchant(); + log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 InitializeAdapayConfig order 2 <<<<<<<<<<<<<"); } /** diff --git a/jsowell-pile/src/main/java/com/jsowell/alipay/config/AliPayConfig.java b/jsowell-pile/src/main/java/com/jsowell/alipay/config/AliPayConfig.java index 968c0a227..b9c1b22d2 100644 --- a/jsowell-pile/src/main/java/com/jsowell/alipay/config/AliPayConfig.java +++ b/jsowell-pile/src/main/java/com/jsowell/alipay/config/AliPayConfig.java @@ -11,7 +11,7 @@ import org.springframework.stereotype.Component; @Slf4j @Component -@Order(2) +@Order(4) public class AliPayConfig implements CommandLineRunner { @Value("${alipay.gatewayHost}") private String gatewayHost; @@ -33,7 +33,6 @@ public class AliPayConfig implements CommandLineRunner { @Override public void run(String... args) throws Exception { - log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 AliPayConfig order 2 <<<<<<<<<<<<<"); // 设置参数(全局只需设置一次) Config config = new Config(); config.protocol = Constants.HTTPS; @@ -57,5 +56,6 @@ public class AliPayConfig implements CommandLineRunner { // 可设置AES密钥,调用AES加解密相关接口时需要(可选) config.encryptKey = encryptKey; Factory.setOptions(config); + log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 AliPayConfig order 4 <<<<<<<<<<<<<"); } } \ No newline at end of file diff --git a/jsowell-pile/src/main/java/com/jsowell/wxpay/config/WechatPayConfig.java b/jsowell-pile/src/main/java/com/jsowell/wxpay/config/WechatPayConfig.java index 797075a5b..dd0d2391f 100644 --- a/jsowell-pile/src/main/java/com/jsowell/wxpay/config/WechatPayConfig.java +++ b/jsowell-pile/src/main/java/com/jsowell/wxpay/config/WechatPayConfig.java @@ -3,13 +3,15 @@ package com.jsowell.wxpay.config; import com.jsowell.wxpay.common.WeChatPayParameter; import com.jsowell.wxpay.utils.WechatPayUtils; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -@Order(3) +@Slf4j @Component +@Order(5) public class WechatPayConfig implements CommandLineRunner { /** * 公众号appid @@ -78,7 +80,6 @@ public class WechatPayConfig implements CommandLineRunner { @Override public void run(String... args) throws Exception { - System.out.println(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 WechatPayConfig order 3 <<<<<<<<<<<<<"); //微信支付 WeChatPayParameter.mchId = wechatMchId; WeChatPayParameter.appId = wechatAppId; @@ -94,5 +95,6 @@ public class WechatPayConfig implements CommandLineRunner { WeChatPayParameter.mchSerialNo = mchSerialNo; //获取平台证书 WeChatPayParameter.certificateMap = WechatPayUtils.refreshCertificate(); + log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 WechatPayConfig order 5 <<<<<<<<<<<<<"); } }