mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-04-20 11:05:18 +08:00
Merge branch 'electricbicycles' into merger
This commit is contained in:
@@ -0,0 +1,88 @@
|
||||
package com.jsowell.netty.decoder;
|
||||
|
||||
import com.jsowell.netty.domain.ChargingPileMessage;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public class ChargingPileDecoder extends ByteToMessageDecoder {
|
||||
|
||||
private static final byte[] FRAME_HEADER = {'D', 'N', 'Y'}; // 包头为"DNY"
|
||||
private static final int MIN_FRAME_LENGTH = 14; // 最小帧长度:包头(3)+长度(2)+物理ID(4)+消息ID(2)+命令(1)+校验(2)
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
while (in.readableBytes() >= MIN_FRAME_LENGTH) {
|
||||
in.markReaderIndex();
|
||||
|
||||
// 检查包头
|
||||
byte[] header = new byte[3];
|
||||
in.readBytes(header);
|
||||
if (!isValidHeader(header)) {
|
||||
in.resetReaderIndex();
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取长度
|
||||
short length = in.readShort();
|
||||
if (in.readableBytes() < length - 5) { // 5 = 包头(3) + 长度(2)
|
||||
in.resetReaderIndex();
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取物理ID
|
||||
int physicalId = in.readInt();
|
||||
log.info("physicalId:{}", physicalId);
|
||||
|
||||
// 读取消息ID
|
||||
short messageId = in.readShort();
|
||||
log.info("messageId:{}", messageId);
|
||||
|
||||
// 读取命令
|
||||
byte command = in.readByte();
|
||||
log.info("command:{}", command);
|
||||
|
||||
// 读取数据
|
||||
int dataLength = length - 13; // 13 = 包头(3) + 长度(2) + 物理ID(4) + 消息ID(2) + 命令(1) + 校验(2)
|
||||
byte[] data = new byte[dataLength];
|
||||
in.readBytes(data);
|
||||
log.info("data:{}", data);
|
||||
|
||||
// 读取校验和
|
||||
short checksum = in.readShort();
|
||||
log.info("checksum:{}", checksum);
|
||||
|
||||
// 验证校验和
|
||||
short calculatedChecksum = calculateChecksum(in, length);
|
||||
log.info("calculatedChecksum:{}", calculatedChecksum);
|
||||
if (checksum != calculatedChecksum) {
|
||||
log.info("校验和错误,丢弃此帧");
|
||||
continue;
|
||||
}
|
||||
|
||||
// 创建消息对象并添加到输出列表
|
||||
ChargingPileMessage message = new ChargingPileMessage(physicalId, messageId, command, data);
|
||||
log.info("ChargingPileMessage:{}", message.toString());
|
||||
out.add(message);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isValidHeader(byte[] header) {
|
||||
log.info("isValidHeader header:{}", header);
|
||||
return header[0] == FRAME_HEADER[0] && header[1] == FRAME_HEADER[1] && header[2] == FRAME_HEADER[2];
|
||||
}
|
||||
|
||||
private short calculateChecksum(ByteBuf buf, int length) {
|
||||
log.info("calculateChecksum ByteBuf:{}, length:{}", buf.toString(), length);
|
||||
short sum = 0;
|
||||
for (int i = 0; i < length - 2; i++) {
|
||||
sum += buf.getByte(buf.readerIndex() - length + i) & 0xFF;
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
package com.jsowell.netty.decoder;
|
||||
|
||||
|
||||
import com.jsowell.netty.domain.ProtocolDnyMessage;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public class ProtocolDnyDecoder extends ByteToMessageDecoder {
|
||||
private static final int HEADER_LENGTH = 3; // 包头长度
|
||||
private static final int LENGTH_FIELD_LENGTH = 2; // 长度字段长度
|
||||
private static final int PHYSICAL_ID_LENGTH = 4; // 物理ID长度
|
||||
private static final int MESSAGE_ID_LENGTH = 2; // 消息ID长度
|
||||
private static final int COMMAND_LENGTH = 1; // 命令长度
|
||||
private static final int CHECKSUM_LENGTH = 2; // 校验字段长度
|
||||
|
||||
private static final int MIN_FRAME_LENGTH = HEADER_LENGTH + LENGTH_FIELD_LENGTH + PHYSICAL_ID_LENGTH + MESSAGE_ID_LENGTH + COMMAND_LENGTH + CHECKSUM_LENGTH;
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
// 检查是否有足够的数据来读取最小帧长度
|
||||
if (in.readableBytes() < MIN_FRAME_LENGTH) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 标记当前读位置
|
||||
in.markReaderIndex();
|
||||
|
||||
// 检查包头
|
||||
byte[] header = new byte[HEADER_LENGTH];
|
||||
in.readBytes(header);
|
||||
if (header[0] != 'D' || header[1] != 'N' || header[2] != 'Y') {
|
||||
in.resetReaderIndex();
|
||||
throw new IllegalStateException("Invalid start bytes: " + new String(header));
|
||||
}
|
||||
|
||||
// 读取长度字段
|
||||
in.order(ByteOrder.LITTLE_ENDIAN); // 确保使用小端模式
|
||||
int length = in.readUnsignedShort();
|
||||
if (in.readableBytes() < length) {
|
||||
in.resetReaderIndex();
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取物理ID
|
||||
byte[] physicalId = new byte[PHYSICAL_ID_LENGTH];
|
||||
in.readBytes(physicalId);
|
||||
|
||||
// 读取消息ID
|
||||
int messageId = in.readUnsignedShort();
|
||||
|
||||
// 读取命令
|
||||
byte command = in.readByte();
|
||||
|
||||
// 读取数据
|
||||
int dataLength = length - (PHYSICAL_ID_LENGTH + MESSAGE_ID_LENGTH + COMMAND_LENGTH + CHECKSUM_LENGTH);
|
||||
byte[] data = new byte[dataLength];
|
||||
in.readBytes(data);
|
||||
|
||||
// 读取校验字段
|
||||
int checksum = in.readUnsignedShort();
|
||||
|
||||
// 计算校验值并验证
|
||||
if (!verifyChecksum(header, length, physicalId, messageId, command, data, checksum)) {
|
||||
in.resetReaderIndex();
|
||||
throw new IllegalStateException("Invalid checksum");
|
||||
}
|
||||
|
||||
// 创建协议消息对象
|
||||
ProtocolDnyMessage message = new ProtocolDnyMessage(header, length, physicalId, messageId, command, data, checksum);
|
||||
|
||||
// 将解码后的消息添加到输出列表中
|
||||
out.add(message);
|
||||
}
|
||||
|
||||
// 校验帧校验域的函数
|
||||
private boolean verifyChecksum(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data, int checksum) {
|
||||
// 这里需要实现校验逻辑,假设我们有一个累加和校验函数sumCheck
|
||||
int calculatedChecksum = sumCheck(header, length, physicalId, messageId, command, data);
|
||||
return calculatedChecksum == checksum;
|
||||
}
|
||||
|
||||
// 累加和校验函数,计算从包头到数据的累加和
|
||||
private int sumCheck(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data) {
|
||||
int sum = 0;
|
||||
|
||||
for (byte b : header) {
|
||||
sum += b & 0xFF;
|
||||
}
|
||||
|
||||
sum += length & 0xFF;
|
||||
sum += (length >> 8) & 0xFF;
|
||||
|
||||
for (byte b : physicalId) {
|
||||
sum += b & 0xFF;
|
||||
}
|
||||
|
||||
sum += messageId & 0xFF;
|
||||
sum += (messageId >> 8) & 0xFF;
|
||||
|
||||
sum += command & 0xFF;
|
||||
|
||||
for (byte b : data) {
|
||||
sum += b & 0xFF;
|
||||
}
|
||||
|
||||
return sum & 0xFFFF; // 返回16位结果
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -5,77 +5,127 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public class StartAndLengthFieldFrameDecoder extends ByteToMessageDecoder {
|
||||
// 起始标志
|
||||
private int HEAD_DATA;
|
||||
private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度
|
||||
private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度
|
||||
|
||||
public StartAndLengthFieldFrameDecoder(int HEAD_DATA) {
|
||||
this.HEAD_DATA = HEAD_DATA;
|
||||
}
|
||||
// 构造函数,初始化起始标志
|
||||
public StartAndLengthFieldFrameDecoder() {}
|
||||
|
||||
/**
|
||||
* <pre>
|
||||
* 协议开始的标准head_data,int类型,占据1个字节.
|
||||
* 表示数据的长度contentLength,int类型,占据1个字节.
|
||||
* </pre>
|
||||
*/
|
||||
public final int BASE_LENGTH = 1 + 1;
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
|
||||
// 可读长度必须大于基本长度
|
||||
if (buffer.readableBytes() <= BASE_LENGTH) {
|
||||
log.warn("可读字节数:{}小于基础长度:{}", buffer.readableBytes(), BASE_LENGTH);
|
||||
return;
|
||||
}
|
||||
|
||||
// 记录包头开始的index
|
||||
int beginReader;
|
||||
|
||||
// 循环查找包头
|
||||
while (true) {
|
||||
if (buffer.readableBytes() < Math.min(HEADER_LENGTH_DNY, HEADER_LENGTH_68)) {
|
||||
return; // 数据长度不足,等待更多数据
|
||||
}
|
||||
|
||||
// 获取包头开始的index
|
||||
beginReader = buffer.readerIndex();
|
||||
// log.info("包头开始的index:{}", beginReader);
|
||||
// 标记包头开始的index
|
||||
buffer.markReaderIndex();
|
||||
// 读到了协议的开始标志,结束while循环
|
||||
if (buffer.getUnsignedByte(beginReader) == HEAD_DATA) {
|
||||
// log.info("读到了协议的开始标志,结束while循环 byte:{}, HEAD_DATA:{}", buffer.getUnsignedByte(beginReader), HEAD_DATA);
|
||||
break;
|
||||
|
||||
// 判断是否为DNY包头或68包头
|
||||
if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) {
|
||||
break; // 读到了协议的开始标志,结束while循环
|
||||
}
|
||||
|
||||
// 未读到包头,略过一个字节
|
||||
// 每次略过,一个字节,去读取,包头信息的开始标记
|
||||
buffer.resetReaderIndex();
|
||||
buffer.readByte();
|
||||
}
|
||||
|
||||
// 当略过,一个字节之后,
|
||||
// 数据包的长度,又变得不满足
|
||||
// 此时,应该结束。等待后面的数据到达
|
||||
if (buffer.readableBytes() < BASE_LENGTH) {
|
||||
log.debug("数据包的长度不满足 readableBytes:{}, BASE_LENGTH:{}", buffer.readableBytes(), BASE_LENGTH);
|
||||
// 检查包头是否是 "DNY"
|
||||
if (buffer.readableBytes() >= HEADER_LENGTH_DNY) {
|
||||
byte[] headerBytes = new byte[HEADER_LENGTH_DNY];
|
||||
buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY);
|
||||
String header = new String(headerBytes, StandardCharsets.UTF_8);
|
||||
|
||||
if ("DNY".equals(header)) {
|
||||
// 处理 DNY 协议
|
||||
decodeDnyMessage(buffer, out, beginReader);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 消息的长度
|
||||
int length = buffer.getUnsignedByte(beginReader + 1);
|
||||
// 判断请求数据包数据是否到齐
|
||||
if (buffer.readableBytes() < length + 4) {
|
||||
// log.info("请求数据包数据没有到齐,还原读指针 readableBytes:{}, 消息的长度:{}", buffer.readableBytes(), length);
|
||||
// 还原读指针
|
||||
// 检查包头是否是 68 协议
|
||||
if (buffer.readableBytes() >= HEADER_LENGTH_68) {
|
||||
if (buffer.getUnsignedByte(beginReader) == 0x68) {
|
||||
// 处理 68 协议
|
||||
decode68Message(buffer, out, beginReader);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 未知协议,还原读指针
|
||||
buffer.resetReaderIndex();
|
||||
}
|
||||
|
||||
// 判断是否为DNY包头
|
||||
private boolean isStartOfDnyHeader(ByteBuf buffer, int beginReader) {
|
||||
if (buffer.readableBytes() >= HEADER_LENGTH_DNY) {
|
||||
byte[] headerBytes = new byte[HEADER_LENGTH_DNY];
|
||||
buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY);
|
||||
String header = new String(headerBytes, StandardCharsets.UTF_8);
|
||||
return "DNY".equals(header);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// 判断是否为68包头
|
||||
private boolean isStartOf68Header(ByteBuf buffer, int beginReader) {
|
||||
if (buffer.readableBytes() >= HEADER_LENGTH_68) {
|
||||
return buffer.getUnsignedByte(beginReader) == 0x68;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// 处理68协议消息
|
||||
private void decode68Message(ByteBuf buffer, List<Object> out, int beginReader) {
|
||||
// 检查剩余数据是否足够
|
||||
if (buffer.readableBytes() < HEADER_LENGTH_68 + 1) {
|
||||
buffer.readerIndex(beginReader);
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取data数据
|
||||
byte[] data = new byte[length + 4];
|
||||
buffer.readBytes(data);
|
||||
ByteBuf frame = buffer.retainedSlice(beginReader, length + 4);
|
||||
buffer.readerIndex(beginReader + length + 4);
|
||||
// 获取消息长度
|
||||
int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68);
|
||||
// 检查剩余数据是否足够
|
||||
if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length) {
|
||||
buffer.readerIndex(beginReader);
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取 data 数据
|
||||
ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length);
|
||||
buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length);
|
||||
out.add(frame);
|
||||
}
|
||||
|
||||
// 处理DNY协议消息
|
||||
private void decodeDnyMessage(ByteBuf buffer, List<Object> out, int beginReader) {
|
||||
// 检查剩余数据是否足够
|
||||
if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) {
|
||||
buffer.readerIndex(beginReader);
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取消息长度
|
||||
int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY);
|
||||
// 检查剩余数据是否足够
|
||||
if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) {
|
||||
buffer.readerIndex(beginReader);
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取 data 数据
|
||||
ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + 1 + length);
|
||||
buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + 1 + length);
|
||||
out.add(frame);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
package com.jsowell.netty.decoder;
|
||||
|
||||
import com.jsowell.netty.domain.DnyMessage;
|
||||
import com.jsowell.netty.domain.Message68;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public class StartAndLengthFieldFrameDecoder2 extends ByteToMessageDecoder {
|
||||
private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度
|
||||
private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度
|
||||
private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度,可以根据实际需求调整
|
||||
private static final byte[] DNY_HEADER = {'D', 'N', 'Y'};
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
|
||||
while (buffer.readableBytes() > 0) {
|
||||
int headerIndex = findHeaderIndex(buffer);
|
||||
if (headerIndex == -1) {
|
||||
// 没有找到有效的包头,丢弃所有数据
|
||||
buffer.skipBytes(buffer.readableBytes());
|
||||
return;
|
||||
}
|
||||
|
||||
buffer.readerIndex(headerIndex);
|
||||
|
||||
if (isDnyHeader(buffer)) {
|
||||
if (decodeDnyMessage(buffer, out)) {
|
||||
return;
|
||||
}
|
||||
} else if (is68Header(buffer)) {
|
||||
if (decode68Message(buffer, out)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// 未知协议,跳过一个字节
|
||||
buffer.skipBytes(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int findHeaderIndex(ByteBuf buffer) {
|
||||
int dnyIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), DNY_HEADER[0]);
|
||||
int index68 = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), (byte) 0x68);
|
||||
|
||||
if (dnyIndex == -1 && index68 == -1) {
|
||||
return -1;
|
||||
} else if (dnyIndex == -1) {
|
||||
return index68;
|
||||
} else if (index68 == -1) {
|
||||
return dnyIndex;
|
||||
} else {
|
||||
return Math.min(dnyIndex, index68);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDnyHeader(ByteBuf buffer) {
|
||||
return buffer.readableBytes() >= HEADER_LENGTH_DNY &&
|
||||
buffer.getByte(buffer.readerIndex()) == 'D' &&
|
||||
buffer.getByte(buffer.readerIndex() + 1) == 'N' &&
|
||||
buffer.getByte(buffer.readerIndex() + 2) == 'Y';
|
||||
}
|
||||
|
||||
private boolean is68Header(ByteBuf buffer) {
|
||||
return buffer.readableBytes() >= HEADER_LENGTH_68 &&
|
||||
buffer.getByte(buffer.readerIndex()) == 0x68;
|
||||
}
|
||||
|
||||
private boolean decodeDnyMessage(ByteBuf buffer, List<Object> out) {
|
||||
if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2) {
|
||||
return false; // 数据不足,等待更多数据
|
||||
}
|
||||
|
||||
int lengthFieldIndex = buffer.readerIndex() + HEADER_LENGTH_DNY;
|
||||
int length = buffer.getUnsignedShort(lengthFieldIndex);
|
||||
|
||||
if (length > MAX_FRAME_LENGTH) {
|
||||
log.warn("DNY frame length exceeds maximum allowed: {}", length);
|
||||
buffer.skipBytes(HEADER_LENGTH_DNY + 2);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2 + length) {
|
||||
return false; // 数据不足,等待更多数据
|
||||
}
|
||||
|
||||
ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_DNY + 2 + length);
|
||||
out.add(new DnyMessage(frame));
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean decode68Message(ByteBuf buffer, List<Object> out) {
|
||||
if (buffer.readableBytes() < HEADER_LENGTH_68 + 1) {
|
||||
return false; // 数据不足,等待更多数据
|
||||
}
|
||||
|
||||
int length = buffer.getUnsignedByte(buffer.readerIndex() + HEADER_LENGTH_68);
|
||||
|
||||
if (length > MAX_FRAME_LENGTH) {
|
||||
log.warn("68 frame length exceeds maximum allowed: {}", length);
|
||||
buffer.skipBytes(HEADER_LENGTH_68 + 1);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length) {
|
||||
return false; // 数据不足,等待更多数据
|
||||
}
|
||||
|
||||
ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_68 + 1 + length);
|
||||
out.add(new Message68(frame));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.jsowell.netty.domain;
|
||||
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
|
||||
public class ChargingPileMessage {
|
||||
private int physicalId;
|
||||
private short messageId;
|
||||
private byte command;
|
||||
private byte[] data;
|
||||
|
||||
public ChargingPileMessage(int physicalId, short messageId, byte command, byte[] data) {
|
||||
this.physicalId = physicalId;
|
||||
this.messageId = messageId;
|
||||
this.command = command;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
// Getters
|
||||
public int getPhysicalId() { return physicalId; }
|
||||
public short getMessageId() { return messageId; }
|
||||
public byte getCommand() { return command; }
|
||||
public byte[] getData() { return data; }
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this, ToStringStyle.JSON_STYLE)
|
||||
.append("physicalId", physicalId)
|
||||
.append("messageId", messageId)
|
||||
.append("command", command)
|
||||
.append("data", data)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.jsowell.netty.domain;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
public class DnyMessage {
|
||||
private final ByteBuf content;
|
||||
|
||||
public DnyMessage(ByteBuf content) {
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
public ByteBuf getContent() {
|
||||
return content;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.jsowell.netty.domain;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
public class Message68 {
|
||||
private final ByteBuf content;
|
||||
|
||||
public Message68(ByteBuf content) {
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
public ByteBuf getContent() {
|
||||
return content;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.jsowell.netty.domain;
|
||||
|
||||
public class ProtocolDnyMessage {
|
||||
private final byte[] header;
|
||||
private final int length;
|
||||
private final byte[] physicalId;
|
||||
private final int messageId;
|
||||
private final byte command;
|
||||
private final byte[] data;
|
||||
private final int checksum;
|
||||
|
||||
public ProtocolDnyMessage(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data, int checksum) {
|
||||
this.header = header;
|
||||
this.length = length;
|
||||
this.physicalId = physicalId;
|
||||
this.messageId = messageId;
|
||||
this.command = command;
|
||||
this.data = data;
|
||||
this.checksum = checksum;
|
||||
}
|
||||
|
||||
public byte[] getHeader() {
|
||||
return header;
|
||||
}
|
||||
|
||||
public int getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
public byte[] getPhysicalId() {
|
||||
return physicalId;
|
||||
}
|
||||
|
||||
public int getMessageId() {
|
||||
return messageId;
|
||||
}
|
||||
|
||||
public byte getCommand() {
|
||||
return command;
|
||||
}
|
||||
|
||||
public byte[] getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public int getChecksum() {
|
||||
return checksum;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
package com.jsowell.netty.server;
|
||||
|
||||
import com.jsowell.common.constant.Constants;
|
||||
import com.jsowell.netty.server.electricbicycles.ElectricBicyclesServerChannelInitializer;
|
||||
import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler;
|
||||
import com.jsowell.netty.server.yunkuaichong.NettyServerChannelInitializer;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.handler.codec.mqtt.MqttDecoder;
|
||||
import io.netty.handler.codec.mqtt.MqttEncoder;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NettyServerManager implements CommandLineRunner {
|
||||
|
||||
@Resource
|
||||
private NettyServerChannelInitializer nettyServerChannelInitializer;
|
||||
|
||||
@Resource
|
||||
private ElectricBicyclesServerChannelInitializer electricBicyclesServerChannelInitializer;
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
startNettyServer(Constants.SOCKET_IP, 9011);
|
||||
startElectricBikeNettyServer(Constants.SOCKET_IP, 9012);
|
||||
// startMqttSever(Constants.SOCKET_IP, 1883);
|
||||
}
|
||||
|
||||
public void startNettyServer(String host, int port) {
|
||||
new Thread(() -> {
|
||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
ServerBootstrap bootstrap = new ServerBootstrap()
|
||||
.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.handler(new LoggingHandler(LogLevel.DEBUG))
|
||||
.option(ChannelOption.SO_BACKLOG, 128)
|
||||
.option(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.childOption(ChannelOption.SO_REUSEADDR, true)
|
||||
.childHandler(nettyServerChannelInitializer)
|
||||
.localAddress(new InetSocketAddress(host, port));
|
||||
|
||||
ChannelFuture future = bootstrap.bind(port).sync();
|
||||
if (future.isSuccess()) {
|
||||
log.info("NettyServer启动成功, 开始监听端口:{}", port);
|
||||
} else {
|
||||
log.error("NettyServer启动失败", future.cause());
|
||||
}
|
||||
|
||||
future.channel().closeFuture().sync();
|
||||
} catch (Exception e) {
|
||||
log.error("NettyServer.start error", e);
|
||||
bossGroup.shutdownGracefully();
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
|
||||
public void startElectricBikeNettyServer(String host, int port) {
|
||||
new Thread(() -> {
|
||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
ServerBootstrap bootstrap = new ServerBootstrap()
|
||||
.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.handler(new LoggingHandler(LogLevel.DEBUG))
|
||||
.option(ChannelOption.SO_BACKLOG, 128)
|
||||
.option(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.childOption(ChannelOption.SO_REUSEADDR, true)
|
||||
.childHandler(electricBicyclesServerChannelInitializer)
|
||||
.localAddress(new InetSocketAddress(host, port));
|
||||
|
||||
ChannelFuture future = bootstrap.bind(port).sync();
|
||||
if (future.isSuccess()) {
|
||||
log.info("ElectricBikeNettyServer启动成功, 开始监听端口:{}", port);
|
||||
} else {
|
||||
log.error("ElectricBikeNettyServer启动失败", future.cause());
|
||||
}
|
||||
|
||||
future.channel().closeFuture().sync();
|
||||
} catch (Exception e) {
|
||||
log.error("ElectricBikeNettyServer.start error", e);
|
||||
bossGroup.shutdownGracefully();
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
|
||||
public void startMqttSever(String host, int port) {
|
||||
new Thread(() -> {
|
||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
ServerBootstrap mqttBootstrap = new ServerBootstrap()
|
||||
.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.handler(new LoggingHandler(LogLevel.DEBUG))
|
||||
.option(ChannelOption.SO_BACKLOG, 128)
|
||||
.option(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.childOption(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.TCP_NODELAY, true)
|
||||
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||
.localAddress(new InetSocketAddress(host, port));
|
||||
|
||||
mqttBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
protected void initChannel(SocketChannel ch) {
|
||||
ChannelPipeline channelPipeline = ch.pipeline();
|
||||
// 设置读写空闲超时时间
|
||||
channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
|
||||
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
|
||||
channelPipeline.addLast("decoder", new MqttDecoder());
|
||||
channelPipeline.addLast(new BootNettyMqttChannelInboundHandler());
|
||||
}
|
||||
});
|
||||
|
||||
ChannelFuture future = mqttBootstrap.bind(port).sync();
|
||||
if (future.isSuccess()) {
|
||||
log.info("MqttServer启动成功, 开始监听端口:{}", port);
|
||||
} else {
|
||||
log.error("MqttServer启动失败", future.cause());
|
||||
}
|
||||
|
||||
future.channel().closeFuture().sync();
|
||||
} catch (Exception e) {
|
||||
log.error("MqttServer.start error", e);
|
||||
bossGroup.shutdownGracefully();
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,185 @@
|
||||
package com.jsowell.netty.server.electricbicycles;
|
||||
|
||||
import com.jsowell.netty.domain.ChargingPileMessage;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelId;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.Instant;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ChargingPileHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
/**
|
||||
* 管理一个全局map,保存连接进服务端的通道数量
|
||||
*/
|
||||
private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 有客户端连接服务器会触发此函数
|
||||
* 连接被建立并且准备进行通信时被调用
|
||||
*/
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
String clientIp = insocket.getAddress().getHostAddress();
|
||||
int clientPort = insocket.getPort();
|
||||
//获取连接通道唯一标识
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
//如果map中不包含此连接,就保存连接
|
||||
if (CHANNEL_MAP.containsKey(channelId)) {
|
||||
log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size());
|
||||
} else {
|
||||
//保存连接
|
||||
CHANNEL_MAP.put(channelId, ctx);
|
||||
log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
log.info("加载客户端报文=== channelId:{}, mag:{}", ctx.channel().id(), msg.toString());
|
||||
// if (!(msg instanceof ChargingPileMessage)) {
|
||||
// return;
|
||||
// }
|
||||
|
||||
ChargingPileMessage message = (ChargingPileMessage) msg;
|
||||
byte command = message.getCommand();
|
||||
|
||||
switch (command) {
|
||||
case 0x11:
|
||||
handleHeartbeat(ctx, message);
|
||||
break;
|
||||
case 0x12:
|
||||
handleTimeRequest(ctx, message);
|
||||
break;
|
||||
case 0x15:
|
||||
handleFirmwareUpgradeRequest(ctx, message);
|
||||
break;
|
||||
case (byte) 0xFA:
|
||||
handleFirmwareUpgradeResponse(ctx, message);
|
||||
break;
|
||||
case 0x31:
|
||||
handleReboot(ctx, message);
|
||||
break;
|
||||
case 0x32:
|
||||
handleCommunicationModuleReboot(ctx, message);
|
||||
break;
|
||||
case 0x33:
|
||||
handleClearUpgradeData(ctx, message);
|
||||
break;
|
||||
case 0x34:
|
||||
handleChangeIPAddress(ctx, message);
|
||||
break;
|
||||
case 0x35:
|
||||
handleSubdeviceVersionUpload(ctx, message);
|
||||
break;
|
||||
case 0x3B:
|
||||
handleFSKParameterRequest(ctx, message);
|
||||
break;
|
||||
default:
|
||||
log.info("Unknown command: " + String.format("0x%02X", command));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleHeartbeat(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理心跳包,无需回复
|
||||
log.info("Received heartbeat from device ID: " + message.getPhysicalId());
|
||||
// 可以在这里更新设备状态、记录日志等
|
||||
}
|
||||
|
||||
private void handleTimeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理时间请求
|
||||
long currentTime = Instant.now().getEpochSecond();
|
||||
byte[] timeBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt((int)currentTime).array();
|
||||
|
||||
ChargingPileMessage response = new ChargingPileMessage(
|
||||
message.getPhysicalId(),
|
||||
message.getMessageId(),
|
||||
(byte) 0x12,
|
||||
timeBytes
|
||||
);
|
||||
|
||||
ctx.writeAndFlush(response);
|
||||
}
|
||||
|
||||
private void handleFirmwareUpgradeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理固件升级请求
|
||||
// 这里需要实现固件升级的逻辑,包括发送固件数据包等
|
||||
log.info("Firmware upgrade requested from device ID: " + message.getPhysicalId());
|
||||
// TODO: 实现固件升级逻辑
|
||||
}
|
||||
|
||||
private void handleFirmwareUpgradeResponse(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理固件升级响应
|
||||
log.info("Firmware upgrade response from device ID: " + message.getPhysicalId());
|
||||
// TODO: 根据响应继续发送下一个固件包或结束升级过程
|
||||
}
|
||||
|
||||
private void handleReboot(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理重启主机指令
|
||||
log.info("Reboot command received for device ID: " + message.getPhysicalId());
|
||||
// 发送成功响应
|
||||
sendSimpleResponse(ctx, message, (byte) 0x31, (byte) 0x00);
|
||||
}
|
||||
|
||||
private void handleCommunicationModuleReboot(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理重启通信模块指令
|
||||
log.info("Communication module reboot command received for device ID: " + message.getPhysicalId());
|
||||
// 发送成功响应
|
||||
sendSimpleResponse(ctx, message, (byte) 0x32, (byte) 0x00);
|
||||
}
|
||||
|
||||
private void handleClearUpgradeData(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理清空升级分机数据指令
|
||||
log.info("Clear upgrade data command received for device ID: " + message.getPhysicalId());
|
||||
// 发送成功响应
|
||||
sendSimpleResponse(ctx, message, (byte) 0x33, (byte) 0x00);
|
||||
}
|
||||
|
||||
private void handleChangeIPAddress(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理更改IP地址指令
|
||||
log.info("Change IP address command received for device ID: " + message.getPhysicalId());
|
||||
// TODO: 实现IP地址更改逻辑
|
||||
// 发送成功响应
|
||||
sendSimpleResponse(ctx, message, (byte) 0x34, (byte) 0x00);
|
||||
}
|
||||
|
||||
private void handleSubdeviceVersionUpload(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理上传分机版本号与设备类型
|
||||
log.info("Subdevice version upload received from device ID: " + message.getPhysicalId());
|
||||
// TODO: 处理上传的分机版本信息
|
||||
// 此命令不需要响应
|
||||
}
|
||||
|
||||
private void handleFSKParameterRequest(ChannelHandlerContext ctx, ChargingPileMessage message) {
|
||||
// 处理请求服务器FSK主机参数
|
||||
log.info("FSK parameter request received from device ID: " + message.getPhysicalId());
|
||||
// TODO: 实现发送FSK参数的逻辑(使用0x3A指令)
|
||||
}
|
||||
|
||||
private void sendSimpleResponse(ChannelHandlerContext ctx, ChargingPileMessage originalMessage, byte command, byte result) {
|
||||
ChargingPileMessage response = new ChargingPileMessage(
|
||||
originalMessage.getPhysicalId(),
|
||||
originalMessage.getMessageId(),
|
||||
command,
|
||||
new byte[]{result}
|
||||
);
|
||||
ctx.writeAndFlush(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.jsowell.netty.server.electricbicycles;
|
||||
|
||||
import com.jsowell.netty.decoder.ChargingPileDecoder;
|
||||
import com.jsowell.netty.decoder.ProtocolDnyDecoder;
|
||||
import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder;
|
||||
import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder2;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.bytes.ByteArrayDecoder;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
@Resource
|
||||
ElectricBicyclesServerHandler electricBicyclesServerHandler;
|
||||
|
||||
@Resource
|
||||
ChargingPileHandler chargingPileHandler;
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel channel) throws Exception {
|
||||
ChannelPipeline pipeline = channel.pipeline();
|
||||
pipeline.addLast("frameDecoder", new ChargingPileDecoder());
|
||||
pipeline.addLast("decoder", new ByteArrayDecoder());
|
||||
pipeline.addLast("encoder", new ByteArrayDecoder());
|
||||
//读超时时间设置为10s,0表示不监控
|
||||
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
|
||||
pipeline.addLast("handler", chargingPileHandler);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,234 @@
|
||||
package com.jsowell.netty.server.electricbicycles;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
|
||||
import com.jsowell.common.enums.ykc.PileChannelEntity;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.StringUtils;
|
||||
import com.jsowell.common.util.YKCUtils;
|
||||
import com.jsowell.netty.service.yunkuaichong.YKCBusinessService;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.handler.timeout.ReadTimeoutException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* netty服务端处理类
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
@Autowired
|
||||
private YKCBusinessService ykcService;
|
||||
|
||||
/**
|
||||
* 管理一个全局map,保存连接进服务端的通道数量
|
||||
*/
|
||||
private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final List<String> notPrintFrameTypeList = Lists.newArrayList("0x03");
|
||||
|
||||
/**
|
||||
* 有客户端连接服务器会触发此函数
|
||||
* 连接被建立并且准备进行通信时被调用
|
||||
*/
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
String clientIp = insocket.getAddress().getHostAddress();
|
||||
int clientPort = insocket.getPort();
|
||||
//获取连接通道唯一标识
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
//如果map中不包含此连接,就保存连接
|
||||
if (CHANNEL_MAP.containsKey(channelId)) {
|
||||
log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size());
|
||||
} else {
|
||||
//保存连接
|
||||
CHANNEL_MAP.put(channelId, ctx);
|
||||
log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 有客户端发消息会触发此函数
|
||||
*/
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
|
||||
// log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg);
|
||||
// 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
|
||||
byte[] msg = (byte[]) message;
|
||||
|
||||
// 获取帧类型
|
||||
byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1);
|
||||
String frameType = YKCUtils.frameType2Str(frameTypeBytes);
|
||||
// 获取序列号域
|
||||
int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2));
|
||||
// 获取channel
|
||||
Channel channel = ctx.channel();
|
||||
|
||||
// new
|
||||
// String hexString = DatatypeConverter.printHexBinary(msg);
|
||||
|
||||
// 心跳包0x03日志太多,造成日志文件过大,改为不打印
|
||||
if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) {
|
||||
// log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}",
|
||||
// channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
|
||||
// BytesUtil.binary(msg, 16), hexString);
|
||||
log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}",
|
||||
channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
|
||||
BytesUtil.binary(msg, 16));
|
||||
}
|
||||
|
||||
// 处理数据
|
||||
byte[] response = ykcService.process(msg, channel);
|
||||
if (Objects.nonNull(response)) {
|
||||
// 响应客户端
|
||||
ByteBuf buffer = ctx.alloc().buffer().writeBytes(response);
|
||||
this.channelWrite(channel.id(), buffer);
|
||||
if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) {
|
||||
// 应答帧类型
|
||||
byte[] responseFrameTypeBytes = YKCFrameTypeCode.ResponseRelation.getResponseFrameTypeBytes(frameTypeBytes);
|
||||
String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes);
|
||||
log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}",
|
||||
channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType),
|
||||
frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
|
||||
BytesUtil.binary(response, 16));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 有客户端终止连接服务器会触发此函数
|
||||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) {
|
||||
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
String clientIp = insocket.getAddress().getHostAddress();
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
//包含此客户端才去删除
|
||||
if (CHANNEL_MAP.containsKey(channelId)) {
|
||||
ykcService.exit(channelId);
|
||||
//删除连接
|
||||
CHANNEL_MAP.remove(channelId);
|
||||
log.info("客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
|
||||
// Channel incoming = ctx.channel();
|
||||
// log.info("handlerAdded: handler被添加到channel的pipeline connect:" + incoming.remoteAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
|
||||
// Channel incoming = ctx.channel();
|
||||
// log.info("handlerRemoved: handler从channel的pipeline中移除 connect:" + incoming.remoteAddress());
|
||||
// ChannelMapByEntity.removeChannel(incoming);
|
||||
// ChannelMap.removeChannel(incoming);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
Channel channel = ctx.channel();
|
||||
// log.info("channel:【{}】读数据完成", channel.id());
|
||||
super.channelReadComplete(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务端给客户端发送消息
|
||||
*
|
||||
* @param channelId 连接通道唯一id
|
||||
* @param msg 需要发送的消息内容
|
||||
*/
|
||||
public void channelWrite(ChannelId channelId, Object msg) throws Exception {
|
||||
ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
|
||||
if (ctx == null) {
|
||||
log.info("通道【{}】不存在", channelId);
|
||||
return;
|
||||
}
|
||||
if (msg == null || msg == "") {
|
||||
log.info("服务端响应空的消息");
|
||||
return;
|
||||
}
|
||||
//将客户端的信息直接返回写入ctx
|
||||
ctx.write(msg);
|
||||
//刷新缓存区
|
||||
ctx.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
String socketString = ctx.channel().remoteAddress().toString();
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText());
|
||||
if (evt instanceof IdleStateEvent) { // 超时事件
|
||||
IdleStateEvent event = (IdleStateEvent) evt;
|
||||
boolean flag = false;
|
||||
if (event.state() == IdleState.READER_IDLE) { // 读
|
||||
flag = true;
|
||||
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn);
|
||||
} else if (event.state() == IdleState.WRITER_IDLE) { // 写
|
||||
flag = true;
|
||||
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn);
|
||||
} else if (event.state() == IdleState.ALL_IDLE) { // 全部
|
||||
flag = true;
|
||||
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn);
|
||||
}
|
||||
if (flag) {
|
||||
ctx.channel().close();
|
||||
// close(channelId, pileSn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发生异常会触发此函数
|
||||
*/
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
String channelIdShortText = channelId.asShortText();
|
||||
String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText);
|
||||
log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause);
|
||||
cause.printStackTrace();
|
||||
// 如果桩连到平台,在1分钟内没有发送数据过来,会报ReadTimeoutException异常
|
||||
if (cause instanceof ReadTimeoutException) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress());
|
||||
}
|
||||
log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size());
|
||||
ctx.channel().close();
|
||||
}
|
||||
// close(channelId, pileSn);
|
||||
}
|
||||
|
||||
|
||||
// 公共方法 关闭连接
|
||||
private void closeConnection(String pileSn, ChannelHandlerContext ctx) {
|
||||
Channel channel = ctx.channel();
|
||||
ChannelId channelId = channel.id();
|
||||
log.error("close方法-发生异常,关闭链接,channelId:{}, pileSn:{}", channelId.asShortText(), pileSn);
|
||||
if (channel != null && !channel.isActive() && !channel.isOpen() && !channel.isWritable()) {
|
||||
channel.close();
|
||||
// 删除连接
|
||||
CHANNEL_MAP.remove(channelId);
|
||||
}
|
||||
// 删除桩编号和channel的关系
|
||||
if (StringUtils.isNotBlank(pileSn)) {
|
||||
PileChannelEntity.removeByPileSn(pileSn);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,8 +19,9 @@ import org.springframework.stereotype.Component;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@Order(5)
|
||||
@Deprecated
|
||||
// @Component
|
||||
// @Order(7)
|
||||
public class MqttSever implements CommandLineRunner {
|
||||
|
||||
@Override
|
||||
|
||||
@@ -16,8 +16,9 @@ import javax.annotation.Resource;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@Order(2)
|
||||
@Deprecated
|
||||
// @Component
|
||||
// @Order(3)
|
||||
public class NettyServer implements CommandLineRunner {
|
||||
@Resource
|
||||
private NettyServerChannelInitializer nettyServerChannelInitializer;
|
||||
|
||||
@@ -21,7 +21,7 @@ public class NettyServerChannelInitializer extends ChannelInitializer<SocketChan
|
||||
protected void initChannel(SocketChannel channel) throws Exception {
|
||||
ChannelPipeline pipeline = channel.pipeline();
|
||||
// pipeline.addLast("frameDecoder",new CustomDecoder());
|
||||
pipeline.addLast("frameDecoder", new StartAndLengthFieldFrameDecoder(0x68));
|
||||
pipeline.addLast("frameDecoder", new StartAndLengthFieldFrameDecoder());
|
||||
pipeline.addLast("decoder", new ByteArrayDecoder());
|
||||
pipeline.addLast("encoder", new ByteArrayDecoder());
|
||||
//读超时时间设置为10s,0表示不监控
|
||||
|
||||
Reference in New Issue
Block a user