package com.jsowell.netty.server; import com.jsowell.common.constant.Constants; import com.jsowell.netty.server.electricbicycles.ElectricBicyclesServerChannelInitializer; import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler; import com.jsowell.netty.server.yunkuaichong.NettyServerChannelInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.InetSocketAddress; @Slf4j @Component public class NettyServerManager implements CommandLineRunner { @Resource private NettyServerChannelInitializer nettyServerChannelInitializer; @Resource private ElectricBicyclesServerChannelInitializer electricBicyclesServerChannelInitializer; private int bossGroupSize = Runtime.getRuntime().availableProcessors(); private int workerGroupSize = bossGroupSize * 2; @Override public void run(String... args) throws Exception { startNettyServer(Constants.SOCKET_IP, 9011); startElectricBikeNettyServer(Constants.SOCKET_IP, 9012); // startMqttSever(Constants.SOCKET_IP, 1883); } public void startNettyServer(String host, int port) { new Thread(() -> { EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupSize); EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupSize); try { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) // .option(ChannelOption.SO_BACKLOG, 128) // 默认128 .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 启用池化内存分配器 // .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) // 保持连接 // .childOption(ChannelOption.SO_RCVBUF, 64 * 1024) // 接收缓冲区 .childOption(ChannelOption.SO_RCVBUF, 1024 * 1024) // 接收缓冲区 // .childOption(ChannelOption.SO_SNDBUF, 64 * 1024) // 发送缓冲区 .childOption(ChannelOption.SO_SNDBUF, 1024 * 1024) // 发送缓冲区 .childOption(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle 算法 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024)) // 写缓冲水位 .childOption(ChannelOption.SO_REUSEADDR, true) // .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 启用池化内存分配器 .childHandler(nettyServerChannelInitializer) .localAddress(new InetSocketAddress(host, port)); ChannelFuture future = bootstrap.bind(port).sync(); if (future.isSuccess()) { log.info("NettyServer启动成功, 开始监听端口:{}", port); } else { log.error("NettyServer启动失败", future.cause()); } future.channel().closeFuture().sync(); } catch (Exception e) { log.error("NettyServer.start error", e); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }).start(); } public void startElectricBikeNettyServer(String host, int port) { new Thread(() -> { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) // .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 启用池化内存分配器 .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) .childHandler(electricBicyclesServerChannelInitializer) .localAddress(new InetSocketAddress(host, port)); ChannelFuture future = bootstrap.bind(port).sync(); if (future.isSuccess()) { log.info("ElectricBikeNettyServer启动成功, 开始监听端口:{}", port); } else { log.error("ElectricBikeNettyServer启动失败", future.cause()); } future.channel().closeFuture().sync(); } catch (Exception e) { log.error("ElectricBikeNettyServer.start error", e); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }).start(); } public void startMqttSever(String host, int port) { new Thread(() -> { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap mqttBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .localAddress(new InetSocketAddress(host, port)); mqttBootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline channelPipeline = ch.pipeline(); // 设置读写空闲超时时间 channelPipeline.addLast(new IdleStateHandler(600, 600, 1200)); channelPipeline.addLast("encoder", MqttEncoder.INSTANCE); channelPipeline.addLast("decoder", new MqttDecoder()); channelPipeline.addLast(new BootNettyMqttChannelInboundHandler()); } }); ChannelFuture future = mqttBootstrap.bind(port).sync(); if (future.isSuccess()) { log.info("MqttServer启动成功, 开始监听端口:{}", port); } else { log.error("MqttServer启动失败", future.cause()); } future.channel().closeFuture().sync(); } catch (Exception e) { log.error("MqttServer.start error", e); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }).start(); } }