mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-04-20 19:15:35 +08:00
update 电单车协议
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package com.jsowell.netty.server;
|
||||
|
||||
import com.jsowell.common.constant.Constants;
|
||||
import com.jsowell.netty.server.electricbicycles.ElectricBicyclesServerChannelInitializer;
|
||||
import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler;
|
||||
import com.jsowell.netty.server.yunkuaichong.NettyServerChannelInitializer;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
@@ -28,6 +29,9 @@ public class NettyServerManager implements CommandLineRunner {
|
||||
@Resource
|
||||
private NettyServerChannelInitializer nettyServerChannelInitializer;
|
||||
|
||||
@Resource
|
||||
private ElectricBicyclesServerChannelInitializer electricBicyclesServerChannelInitializer;
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
startNettyServer(Constants.SOCKET_IP, 9011);
|
||||
@@ -82,7 +86,7 @@ public class NettyServerManager implements CommandLineRunner {
|
||||
.option(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.childOption(ChannelOption.SO_REUSEADDR, true)
|
||||
.childHandler(nettyServerChannelInitializer)
|
||||
.childHandler(electricBicyclesServerChannelInitializer)
|
||||
.localAddress(new InetSocketAddress(host, port));
|
||||
|
||||
ChannelFuture future = bootstrap.bind(port).sync();
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.jsowell.netty.server.electricbicycles;
|
||||
|
||||
import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.bytes.ByteArrayDecoder;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
@Resource
|
||||
ElectricBicyclesServerHandler nettyServerHandler;
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel channel) throws Exception {
|
||||
ChannelPipeline pipeline = channel.pipeline();
|
||||
// pipeline.addLast("frameDecoder",new CustomDecoder());
|
||||
pipeline.addLast("frameDecoder", new StartAndLengthFieldFrameDecoder());
|
||||
pipeline.addLast("decoder", new ByteArrayDecoder());
|
||||
pipeline.addLast("encoder", new ByteArrayDecoder());
|
||||
//读超时时间设置为10s,0表示不监控
|
||||
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
|
||||
pipeline.addLast("handler", nettyServerHandler);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,234 @@
|
||||
package com.jsowell.netty.server.electricbicycles;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
|
||||
import com.jsowell.common.enums.ykc.PileChannelEntity;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.StringUtils;
|
||||
import com.jsowell.common.util.YKCUtils;
|
||||
import com.jsowell.netty.service.yunkuaichong.YKCBusinessService;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.handler.timeout.ReadTimeoutException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* netty服务端处理类
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
@Autowired
|
||||
private YKCBusinessService ykcService;
|
||||
|
||||
/**
|
||||
* 管理一个全局map,保存连接进服务端的通道数量
|
||||
*/
|
||||
private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final List<String> notPrintFrameTypeList = Lists.newArrayList("0x03");
|
||||
|
||||
/**
|
||||
* 有客户端连接服务器会触发此函数
|
||||
* 连接被建立并且准备进行通信时被调用
|
||||
*/
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
String clientIp = insocket.getAddress().getHostAddress();
|
||||
int clientPort = insocket.getPort();
|
||||
//获取连接通道唯一标识
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
//如果map中不包含此连接,就保存连接
|
||||
if (CHANNEL_MAP.containsKey(channelId)) {
|
||||
log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size());
|
||||
} else {
|
||||
//保存连接
|
||||
CHANNEL_MAP.put(channelId, ctx);
|
||||
log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 有客户端发消息会触发此函数
|
||||
*/
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
|
||||
// log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg);
|
||||
// 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
|
||||
byte[] msg = (byte[]) message;
|
||||
|
||||
// 获取帧类型
|
||||
byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1);
|
||||
String frameType = YKCUtils.frameType2Str(frameTypeBytes);
|
||||
// 获取序列号域
|
||||
int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2));
|
||||
// 获取channel
|
||||
Channel channel = ctx.channel();
|
||||
|
||||
// new
|
||||
// String hexString = DatatypeConverter.printHexBinary(msg);
|
||||
|
||||
// 心跳包0x03日志太多,造成日志文件过大,改为不打印
|
||||
if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) {
|
||||
// log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}",
|
||||
// channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
|
||||
// BytesUtil.binary(msg, 16), hexString);
|
||||
log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}",
|
||||
channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
|
||||
BytesUtil.binary(msg, 16));
|
||||
}
|
||||
|
||||
// 处理数据
|
||||
byte[] response = ykcService.process(msg, channel);
|
||||
if (Objects.nonNull(response)) {
|
||||
// 响应客户端
|
||||
ByteBuf buffer = ctx.alloc().buffer().writeBytes(response);
|
||||
this.channelWrite(channel.id(), buffer);
|
||||
if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) {
|
||||
// 应答帧类型
|
||||
byte[] responseFrameTypeBytes = YKCFrameTypeCode.ResponseRelation.getResponseFrameTypeBytes(frameTypeBytes);
|
||||
String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes);
|
||||
log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}",
|
||||
channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType),
|
||||
frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
|
||||
BytesUtil.binary(response, 16));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 有客户端终止连接服务器会触发此函数
|
||||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) {
|
||||
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
String clientIp = insocket.getAddress().getHostAddress();
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
//包含此客户端才去删除
|
||||
if (CHANNEL_MAP.containsKey(channelId)) {
|
||||
ykcService.exit(channelId);
|
||||
//删除连接
|
||||
CHANNEL_MAP.remove(channelId);
|
||||
log.info("客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
|
||||
// Channel incoming = ctx.channel();
|
||||
// log.info("handlerAdded: handler被添加到channel的pipeline connect:" + incoming.remoteAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
|
||||
// Channel incoming = ctx.channel();
|
||||
// log.info("handlerRemoved: handler从channel的pipeline中移除 connect:" + incoming.remoteAddress());
|
||||
// ChannelMapByEntity.removeChannel(incoming);
|
||||
// ChannelMap.removeChannel(incoming);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
Channel channel = ctx.channel();
|
||||
// log.info("channel:【{}】读数据完成", channel.id());
|
||||
super.channelReadComplete(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务端给客户端发送消息
|
||||
*
|
||||
* @param channelId 连接通道唯一id
|
||||
* @param msg 需要发送的消息内容
|
||||
*/
|
||||
public void channelWrite(ChannelId channelId, Object msg) throws Exception {
|
||||
ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
|
||||
if (ctx == null) {
|
||||
log.info("通道【{}】不存在", channelId);
|
||||
return;
|
||||
}
|
||||
if (msg == null || msg == "") {
|
||||
log.info("服务端响应空的消息");
|
||||
return;
|
||||
}
|
||||
//将客户端的信息直接返回写入ctx
|
||||
ctx.write(msg);
|
||||
//刷新缓存区
|
||||
ctx.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
String socketString = ctx.channel().remoteAddress().toString();
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText());
|
||||
if (evt instanceof IdleStateEvent) { // 超时事件
|
||||
IdleStateEvent event = (IdleStateEvent) evt;
|
||||
boolean flag = false;
|
||||
if (event.state() == IdleState.READER_IDLE) { // 读
|
||||
flag = true;
|
||||
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn);
|
||||
} else if (event.state() == IdleState.WRITER_IDLE) { // 写
|
||||
flag = true;
|
||||
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn);
|
||||
} else if (event.state() == IdleState.ALL_IDLE) { // 全部
|
||||
flag = true;
|
||||
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn);
|
||||
}
|
||||
if (flag) {
|
||||
ctx.channel().close();
|
||||
// close(channelId, pileSn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发生异常会触发此函数
|
||||
*/
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
String channelIdShortText = channelId.asShortText();
|
||||
String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText);
|
||||
log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause);
|
||||
cause.printStackTrace();
|
||||
// 如果桩连到平台,在1分钟内没有发送数据过来,会报ReadTimeoutException异常
|
||||
if (cause instanceof ReadTimeoutException) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress());
|
||||
}
|
||||
log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size());
|
||||
ctx.channel().close();
|
||||
}
|
||||
// close(channelId, pileSn);
|
||||
}
|
||||
|
||||
|
||||
// 公共方法 关闭连接
|
||||
private void closeConnection(String pileSn, ChannelHandlerContext ctx) {
|
||||
Channel channel = ctx.channel();
|
||||
ChannelId channelId = channel.id();
|
||||
log.error("close方法-发生异常,关闭链接,channelId:{}, pileSn:{}", channelId.asShortText(), pileSn);
|
||||
if (channel != null && !channel.isActive() && !channel.isOpen() && !channel.isWritable()) {
|
||||
channel.close();
|
||||
// 删除连接
|
||||
CHANNEL_MAP.remove(channelId);
|
||||
}
|
||||
// 删除桩编号和channel的关系
|
||||
if (StringUtils.isNotBlank(pileSn)) {
|
||||
PileChannelEntity.removeByPileSn(pileSn);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,8 +19,9 @@ import org.springframework.stereotype.Component;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@Order(7)
|
||||
@Deprecated
|
||||
// @Component
|
||||
// @Order(7)
|
||||
public class MqttSever implements CommandLineRunner {
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user