diff --git a/jsowell-common/src/main/java/com/jsowell/common/util/ChannelManagerUtil.java b/jsowell-common/src/main/java/com/jsowell/common/util/ChannelManagerUtil.java new file mode 100644 index 000000000..0dd81dde1 --- /dev/null +++ b/jsowell-common/src/main/java/com/jsowell/common/util/ChannelManagerUtil.java @@ -0,0 +1,63 @@ +package com.jsowell.common.util; + +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 管理一个全局map,保存连接进服务端的通道数量 + */ +@Slf4j +public class ChannelManagerUtil { + + // 使用 ConcurrentHashMap 来保证线程安全 + private static final ConcurrentHashMap channelMap = new ConcurrentHashMap<>(); + + /** + * 添加通道到 map 中 + * + * @param channelId 通道 ID + * @param ctx 连接的 Channel 对象 + */ + public static void addChannel(String channelId, ChannelHandlerContext ctx) { + if (channelMap.containsKey(channelId)) { + log.info("客户端【{}】是连接状态,连接通道数量: {}", channelId, channelMap.size()); + } else { + channelMap.put(channelId, ctx); + InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = socket.getAddress().getHostAddress(); + int clientPort = socket.getPort(); + log.info("客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, clientPort, channelMap.size()); + } + } + + /** + * 移除指定的通道 + * + * @param channelId 通道 ID + */ + public static void removeChannel(String channelId) { + channelMap.remove(channelId); + } + + /** + * 获取当前连接的通道数量 + * + * @return 通道数量 + */ + public static int getChannelCount() { + return channelMap.size(); + } + + /** + * 根据通道 ID 获取 Channel 对象 + * + * @param channelId 通道 ID + * @return Channel 对象或 null 如果不存在 + */ + public static ChannelHandlerContext getChannel(String channelId) { + return channelMap.get(channelId); + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java deleted file mode 100644 index e78d3cd68..000000000 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.jsowell.netty.server.electricbicycles; - -import com.alibaba.fastjson2.JSON; -import com.jsowell.pile.domain.ebike.AbsEBikeMessage; -import io.netty.channel.*; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@ChannelHandler.Sharable -@Slf4j -@Component -public class ChargingPileHandler extends SimpleChannelInboundHandler { - - @Override - protected void channelRead0(ChannelHandlerContext ctx, AbsEBikeMessage msg) throws Exception { - log.info("收到消息, channelId:{}, msg:{}", ctx.channel().id().toString(), JSON.toJSONString(msg)); - } - -} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java index 9f4d35f63..a94749fd7 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java @@ -16,10 +16,7 @@ import java.util.concurrent.TimeUnit; public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer { @Resource - ElectricBicyclesServerHandler electricBicyclesServerHandler; - - @Resource - ChargingPileHandler chargingPileHandler; + ElectricBicyclesServerHandler chargingPileHandler; @Override protected void initChannel(SocketChannel channel) throws Exception { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java index 375c8c6d0..82957fb76 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java @@ -1,234 +1,24 @@ package com.jsowell.netty.server.electricbicycles; -import com.google.common.collect.Lists; -import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; -import com.jsowell.common.enums.ykc.PileChannelEntity; -import com.jsowell.common.util.BytesUtil; -import com.jsowell.common.util.StringUtils; -import com.jsowell.common.util.YKCUtils; -import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; -import io.netty.buffer.ByteBuf; +import com.alibaba.fastjson2.JSON; +import com.jsowell.netty.service.electricbicycles.EBikeBusinessService; +import com.jsowell.pile.domain.ebike.AbsEBikeMessage; import io.netty.channel.*; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.ReadTimeoutException; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -/** - * netty服务端处理类 - */ @ChannelHandler.Sharable @Slf4j @Component -public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter { +public class ElectricBicyclesServerHandler extends SimpleChannelInboundHandler { - @Autowired - private YKCBusinessService ykcService; + @Autowired + private EBikeBusinessService eBikeService; - /** - * 管理一个全局map,保存连接进服务端的通道数量 - */ - private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + @Override + protected void channelRead0(ChannelHandlerContext ctx, AbsEBikeMessage msg) throws Exception { + log.info("收到消息, channelId:{}, msg:{}", ctx.channel().id().toString(), JSON.toJSONString(msg)); + } - private final List notPrintFrameTypeList = Lists.newArrayList("0x03"); - - /** - * 有客户端连接服务器会触发此函数 - * 连接被建立并且准备进行通信时被调用 - */ - @Override - public void channelActive(ChannelHandlerContext ctx) { - InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); - String clientIp = insocket.getAddress().getHostAddress(); - int clientPort = insocket.getPort(); - //获取连接通道唯一标识 - ChannelId channelId = ctx.channel().id(); - //如果map中不包含此连接,就保存连接 - if (CHANNEL_MAP.containsKey(channelId)) { - log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size()); - } else { - //保存连接 - CHANNEL_MAP.put(channelId, ctx); - log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size()); - } - } - - /** - * 有客户端发消息会触发此函数 - */ - @Override - public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { - // log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg); - // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 - byte[] msg = (byte[]) message; - - // 获取帧类型 - byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1); - String frameType = YKCUtils.frameType2Str(frameTypeBytes); - // 获取序列号域 - int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2)); - // 获取channel - Channel channel = ctx.channel(); - - // new - // String hexString = DatatypeConverter.printHexBinary(msg); - - // 心跳包0x03日志太多,造成日志文件过大,改为不打印 - if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { - // log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}", - // channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, - // BytesUtil.binary(msg, 16), hexString); - log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", - channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, - BytesUtil.binary(msg, 16)); - } - - // 处理数据 - byte[] response = ykcService.process(msg, ctx); - if (Objects.nonNull(response)) { - // 响应客户端 - ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); - this.channelWrite(channel.id(), buffer); - if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { - // 应答帧类型 - byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes); - String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes); - log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", - channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), - frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, - BytesUtil.binary(response, 16)); - } - } - } - - /** - * 有客户端终止连接服务器会触发此函数 - */ - @Override - public void channelInactive(ChannelHandlerContext ctx) { - InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); - String clientIp = insocket.getAddress().getHostAddress(); - ChannelId channelId = ctx.channel().id(); - //包含此客户端才去删除 - if (CHANNEL_MAP.containsKey(channelId)) { - ykcService.exit(channelId); - //删除连接 - CHANNEL_MAP.remove(channelId); - log.info("客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size()); - } - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) - // Channel incoming = ctx.channel(); - // log.info("handlerAdded: handler被添加到channel的pipeline connect:" + incoming.remoteAddress()); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) - // Channel incoming = ctx.channel(); - // log.info("handlerRemoved: handler从channel的pipeline中移除 connect:" + incoming.remoteAddress()); - // ChannelMapByEntity.removeChannel(incoming); - // ChannelMap.removeChannel(incoming); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - Channel channel = ctx.channel(); - // log.info("channel:【{}】读数据完成", channel.id()); - super.channelReadComplete(ctx); - } - - /** - * 服务端给客户端发送消息 - * - * @param channelId 连接通道唯一id - * @param msg 需要发送的消息内容 - */ - public void channelWrite(ChannelId channelId, Object msg) throws Exception { - ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); - if (ctx == null) { - log.info("通道【{}】不存在", channelId); - return; - } - if (msg == null || msg == "") { - log.info("服务端响应空的消息"); - return; - } - //将客户端的信息直接返回写入ctx - ctx.write(msg); - //刷新缓存区 - ctx.flush(); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - String socketString = ctx.channel().remoteAddress().toString(); - ChannelId channelId = ctx.channel().id(); - String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText()); - if (evt instanceof IdleStateEvent) { // 超时事件 - IdleStateEvent event = (IdleStateEvent) evt; - boolean flag = false; - if (event.state() == IdleState.READER_IDLE) { // 读 - flag = true; - // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn); - } else if (event.state() == IdleState.WRITER_IDLE) { // 写 - flag = true; - // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn); - } else if (event.state() == IdleState.ALL_IDLE) { // 全部 - flag = true; - // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn); - } - if (flag) { - ctx.channel().close(); - // close(channelId, pileSn); - } - } - } - - /** - * 发生异常会触发此函数 - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ChannelId channelId = ctx.channel().id(); - String channelIdShortText = channelId.asShortText(); - String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText); - log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause); - cause.printStackTrace(); - // 如果桩连到平台,在1分钟内没有发送数据过来,会报ReadTimeoutException异常 - if (cause instanceof ReadTimeoutException) { - if (log.isTraceEnabled()) { - log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress()); - } - log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size()); - ctx.channel().close(); - } - // close(channelId, pileSn); - } - - - // 公共方法 关闭连接 - private void closeConnection(String pileSn, ChannelHandlerContext ctx) { - Channel channel = ctx.channel(); - ChannelId channelId = channel.id(); - log.error("close方法-发生异常,关闭链接,channelId:{}, pileSn:{}", channelId.asShortText(), pileSn); - if (channel != null && !channel.isActive() && !channel.isOpen() && !channel.isWritable()) { - channel.close(); - // 删除连接 - CHANNEL_MAP.remove(channelId); - } - // 删除桩编号和channel的关系 - if (StringUtils.isNotBlank(pileSn)) { - PileChannelEntity.removeByPileSn(pileSn); - } - } } \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/YKCBusinessService.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/EBikeBusinessService.java similarity index 86% rename from jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/YKCBusinessService.java rename to jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/EBikeBusinessService.java index 4595dcb5f..1d7be7e68 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/YKCBusinessService.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/EBikeBusinessService.java @@ -1,6 +1,5 @@ package com.jsowell.netty.service.electricbicycles; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; @@ -8,7 +7,7 @@ import io.netty.channel.ChannelId; * 云快充处理service */ -public interface YKCBusinessService { +public interface EBikeBusinessService { /** * 处理桩发来的请求 * 不需要应答的返回null diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/YKCBusinessServiceImpl2.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/EBikeBusinessServiceImpl.java similarity index 91% rename from jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/YKCBusinessServiceImpl2.java rename to jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/EBikeBusinessServiceImpl.java index 7be8231e6..00a33e808 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/YKCBusinessServiceImpl2.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/electricbicycles/impl/EBikeBusinessServiceImpl.java @@ -8,11 +8,10 @@ import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.YKCUtils; import com.jsowell.netty.factory.YKCOperateFactory; import com.jsowell.netty.handler.yunkuaichong.AbstractYkcHandler; -import com.jsowell.netty.service.electricbicycles.YKCBusinessService; +import com.jsowell.netty.service.electricbicycles.EBikeBusinessService; import com.jsowell.pile.service.OrderBasicInfoService; import com.jsowell.pile.service.PileConnectorInfoService; import com.jsowell.pile.service.PileMsgRecordService; -import com.jsowell.pile.service.YKCPushCommandService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; @@ -21,7 +20,7 @@ import org.springframework.stereotype.Service; @Slf4j @Service -public class YKCBusinessServiceImpl2 implements YKCBusinessService { +public class EBikeBusinessServiceImpl implements EBikeBusinessService { @Autowired private PileMsgRecordService pileMsgRecordService; @@ -32,9 +31,6 @@ public class YKCBusinessServiceImpl2 implements YKCBusinessService { @Autowired private OrderBasicInfoService orderBasicInfoService; - @Autowired - private YKCPushCommandService ykcPushCommandService; - @Override public byte[] process(byte[] msg, ChannelHandlerContext ctx) { if (!YKCUtils.checkMsg(msg)) { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java index cb27d30dc..7a996d323 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java @@ -9,10 +9,9 @@ import com.jsowell.common.util.YKCUtils; import com.jsowell.netty.factory.YKCOperateFactory; import com.jsowell.netty.handler.yunkuaichong.AbstractYkcHandler; import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; +import com.jsowell.pile.service.OrderBasicInfoService; import com.jsowell.pile.service.PileConnectorInfoService; import com.jsowell.pile.service.PileMsgRecordService; -import com.jsowell.pile.service.OrderBasicInfoService; -import com.jsowell.pile.service.YKCPushCommandService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; @@ -32,9 +31,6 @@ public class YKCBusinessServiceImpl implements YKCBusinessService { @Autowired private OrderBasicInfoService orderBasicInfoService; - @Autowired - private YKCPushCommandService ykcPushCommandService; - @Override public byte[] process(byte[] msg, ChannelHandlerContext ctx) { if (!YKCUtils.checkMsg(msg)) {