diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java index 0c050ba84..bcf9c2415 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java @@ -1,15 +1,19 @@ package com.jsowell.netty.server; import com.jsowell.common.constant.Constants; +import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler; import com.jsowell.netty.server.yunkuaichong.NettyServerChannelInitializer; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @@ -28,6 +32,7 @@ public class NettyServerManager implements CommandLineRunner { public void run(String... args) throws Exception { startNettyServer(Constants.SOCKET_IP, 9011); startElectricBikeNettyServer(Constants.SOCKET_IP, 9012); + startMqttSever(Constants.SOCKET_IP, 1883); } public void startNettyServer(String host, int port) { @@ -95,4 +100,49 @@ public class NettyServerManager implements CommandLineRunner { } }).start(); } + + public void startMqttSever(String host, int port) { + new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap mqttBootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .localAddress(new InetSocketAddress(host, port)); + + mqttBootstrap.childHandler(new ChannelInitializer() { + protected void initChannel(SocketChannel ch) { + ChannelPipeline channelPipeline = ch.pipeline(); + // 设置读写空闲超时时间 + channelPipeline.addLast(new IdleStateHandler(600, 600, 1200)); + channelPipeline.addLast("encoder", MqttEncoder.INSTANCE); + channelPipeline.addLast("decoder", new MqttDecoder()); + channelPipeline.addLast(new BootNettyMqttChannelInboundHandler()); + } + }); + + ChannelFuture future = mqttBootstrap.bind(port).sync(); + if (future.isSuccess()) { + log.info("MqttServer启动成功, 开始监听端口:{}", port); + } else { + log.error("MqttServer启动失败", future.cause()); + } + + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("MqttServer.start error", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + }).start(); + } }