diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java index bcf9c2415..a12be555b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java @@ -1,6 +1,7 @@ package com.jsowell.netty.server; import com.jsowell.common.constant.Constants; +import com.jsowell.netty.server.electricbicycles.ElectricBicyclesServerChannelInitializer; import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler; import com.jsowell.netty.server.yunkuaichong.NettyServerChannelInitializer; import io.netty.bootstrap.ServerBootstrap; @@ -28,6 +29,9 @@ public class NettyServerManager implements CommandLineRunner { @Resource private NettyServerChannelInitializer nettyServerChannelInitializer; + @Resource + private ElectricBicyclesServerChannelInitializer electricBicyclesServerChannelInitializer; + @Override public void run(String... args) throws Exception { startNettyServer(Constants.SOCKET_IP, 9011); @@ -82,7 +86,7 @@ public class NettyServerManager implements CommandLineRunner { .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) - .childHandler(nettyServerChannelInitializer) + .childHandler(electricBicyclesServerChannelInitializer) .localAddress(new InetSocketAddress(host, port)); ChannelFuture future = bootstrap.bind(port).sync(); diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java new file mode 100644 index 000000000..4b84422f5 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java @@ -0,0 +1,32 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.bytes.ByteArrayDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; + +@Component +public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer { + + @Resource + ElectricBicyclesServerHandler nettyServerHandler; + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + // pipeline.addLast("frameDecoder",new CustomDecoder()); + pipeline.addLast("frameDecoder", new StartAndLengthFieldFrameDecoder()); + pipeline.addLast("decoder", new ByteArrayDecoder()); + pipeline.addLast("encoder", new ByteArrayDecoder()); + //读超时时间设置为10s,0表示不监控 + pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); + pipeline.addLast("handler", nettyServerHandler); + } + +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java new file mode 100644 index 000000000..72f99012f --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java @@ -0,0 +1,234 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.google.common.collect.Lists; +import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; +import com.jsowell.common.enums.ykc.PileChannelEntity; +import com.jsowell.common.util.BytesUtil; +import com.jsowell.common.util.StringUtils; +import com.jsowell.common.util.YKCUtils; +import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.ReadTimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * netty服务端处理类 + */ +@ChannelHandler.Sharable +@Slf4j +@Component +public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter { + + @Autowired + private YKCBusinessService ykcService; + + /** + * 管理一个全局map,保存连接进服务端的通道数量 + */ + private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + + private final List notPrintFrameTypeList = Lists.newArrayList("0x03"); + + /** + * 有客户端连接服务器会触发此函数 + * 连接被建立并且准备进行通信时被调用 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + int clientPort = insocket.getPort(); + //获取连接通道唯一标识 + ChannelId channelId = ctx.channel().id(); + //如果map中不包含此连接,就保存连接 + if (CHANNEL_MAP.containsKey(channelId)) { + log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size()); + } else { + //保存连接 + CHANNEL_MAP.put(channelId, ctx); + log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size()); + } + } + + /** + * 有客户端发消息会触发此函数 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { + // log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg); + // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 + byte[] msg = (byte[]) message; + + // 获取帧类型 + byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1); + String frameType = YKCUtils.frameType2Str(frameTypeBytes); + // 获取序列号域 + int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2)); + // 获取channel + Channel channel = ctx.channel(); + + // new + // String hexString = DatatypeConverter.printHexBinary(msg); + + // 心跳包0x03日志太多,造成日志文件过大,改为不打印 + if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}", + // channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + // BytesUtil.binary(msg, 16), hexString); + log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", + channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + BytesUtil.binary(msg, 16)); + } + + // 处理数据 + byte[] response = ykcService.process(msg, channel); + if (Objects.nonNull(response)) { + // 响应客户端 + ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); + this.channelWrite(channel.id(), buffer); + if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // 应答帧类型 + byte[] responseFrameTypeBytes = YKCFrameTypeCode.ResponseRelation.getResponseFrameTypeBytes(frameTypeBytes); + String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes); + log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", + channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), + frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + BytesUtil.binary(response, 16)); + } + } + } + + /** + * 有客户端终止连接服务器会触发此函数 + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + ChannelId channelId = ctx.channel().id(); + //包含此客户端才去删除 + if (CHANNEL_MAP.containsKey(channelId)) { + ykcService.exit(channelId); + //删除连接 + CHANNEL_MAP.remove(channelId); + log.info("客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size()); + } + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) + // Channel incoming = ctx.channel(); + // log.info("handlerAdded: handler被添加到channel的pipeline connect:" + incoming.remoteAddress()); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) + // Channel incoming = ctx.channel(); + // log.info("handlerRemoved: handler从channel的pipeline中移除 connect:" + incoming.remoteAddress()); + // ChannelMapByEntity.removeChannel(incoming); + // ChannelMap.removeChannel(incoming); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + // log.info("channel:【{}】读数据完成", channel.id()); + super.channelReadComplete(ctx); + } + + /** + * 服务端给客户端发送消息 + * + * @param channelId 连接通道唯一id + * @param msg 需要发送的消息内容 + */ + public void channelWrite(ChannelId channelId, Object msg) throws Exception { + ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); + if (ctx == null) { + log.info("通道【{}】不存在", channelId); + return; + } + if (msg == null || msg == "") { + log.info("服务端响应空的消息"); + return; + } + //将客户端的信息直接返回写入ctx + ctx.write(msg); + //刷新缓存区 + ctx.flush(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + String socketString = ctx.channel().remoteAddress().toString(); + ChannelId channelId = ctx.channel().id(); + String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText()); + if (evt instanceof IdleStateEvent) { // 超时事件 + IdleStateEvent event = (IdleStateEvent) evt; + boolean flag = false; + if (event.state() == IdleState.READER_IDLE) { // 读 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn); + } else if (event.state() == IdleState.WRITER_IDLE) { // 写 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn); + } else if (event.state() == IdleState.ALL_IDLE) { // 全部 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn); + } + if (flag) { + ctx.channel().close(); + // close(channelId, pileSn); + } + } + } + + /** + * 发生异常会触发此函数 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ChannelId channelId = ctx.channel().id(); + String channelIdShortText = channelId.asShortText(); + String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText); + log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause); + cause.printStackTrace(); + // 如果桩连到平台,在1分钟内没有发送数据过来,会报ReadTimeoutException异常 + if (cause instanceof ReadTimeoutException) { + if (log.isTraceEnabled()) { + log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress()); + } + log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size()); + ctx.channel().close(); + } + // close(channelId, pileSn); + } + + + // 公共方法 关闭连接 + private void closeConnection(String pileSn, ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + ChannelId channelId = channel.id(); + log.error("close方法-发生异常,关闭链接,channelId:{}, pileSn:{}", channelId.asShortText(), pileSn); + if (channel != null && !channel.isActive() && !channel.isOpen() && !channel.isWritable()) { + channel.close(); + // 删除连接 + CHANNEL_MAP.remove(channelId); + } + // 删除桩编号和channel的关系 + if (StringUtils.isNotBlank(pileSn)) { + PileChannelEntity.removeByPileSn(pileSn); + } + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java index c2970de0b..59e8395e0 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java @@ -19,8 +19,9 @@ import org.springframework.stereotype.Component; import java.net.InetSocketAddress; @Slf4j -@Component -@Order(7) +@Deprecated +// @Component +// @Order(7) public class MqttSever implements CommandLineRunner { @Override