使用ChannelHandlerContext

This commit is contained in:
Guoqs
2024-08-01 11:43:53 +08:00
parent 1cec3f64c5
commit 3ac1d5752c
6 changed files with 30 additions and 35 deletions

View File

@@ -2,6 +2,7 @@ package com.jsowell.common.enums.ykc;
import com.jsowell.common.util.StringUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
@@ -16,35 +17,35 @@ public class PileChannelEntity {
/**
* 管理一个全局map保存连接进服务端的通道数量
*/
private static final ConcurrentHashMap<String, Channel> manager = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, ChannelHandlerContext> manager = new ConcurrentHashMap<>();
/**
* 校验channel是否保存
*/
public static void checkChannel(String pileSn, Channel channel) {
public static void checkChannel(String pileSn, ChannelHandlerContext ctx) {
if (!manager.containsKey(pileSn)) {
// 如果manager中不存在pileSn的连接则保存
log.info("checkChannel-manager中不存在pileSn:{}的连接,保存新的channel:{}", pileSn, channel.id().asLongText());
manager.put(pileSn, channel);
log.info("checkChannel-manager中不存在pileSn:{}的连接,保存新的channel:{}", pileSn, ctx.channel().id().asLongText());
manager.put(pileSn, ctx);
return;
}
// 如果manager中存在pileSn的连接取出来对比
Channel sourceChannel = manager.get(pileSn);
Channel sourceChannel = manager.get(pileSn).channel();
if (sourceChannel == null) {
// 为空就put
log.info("checkChannel-manager中pileSn:{}的连接为空,保存新的channel:{}", pileSn, channel.id().asLongText());
manager.put(pileSn, channel);
log.info("checkChannel-manager中pileSn:{}的连接为空,保存新的channel:{}", pileSn, ctx.channel().id().asLongText());
manager.put(pileSn, ctx);
return;
}
// 两个做对比
String sourceChannelId = sourceChannel.id().asLongText();
String channelId = channel.id().asLongText();
String channelId = ctx.channel().id().asLongText();
if (!StringUtils.equals(sourceChannelId, channelId)) {
// 不一致则更新
log.info("checkChannel-manager中pileSn:{}的连接不一致, 老channelId:{}, 保存新的channel:{}", pileSn, sourceChannelId, channelId);
manager.put(pileSn, channel);
manager.put(pileSn, ctx);
}
}
@@ -53,7 +54,7 @@ public class PileChannelEntity {
* @param pileSn
* @return
*/
public static Channel getChannelByPileSn(String pileSn) {
public static ChannelHandlerContext getChannelByPileSn(String pileSn) {
return manager.get(pileSn);
}
@@ -63,8 +64,8 @@ public class PileChannelEntity {
* @return
*/
public static String getPileSnByChannelId(String channelId) {
for (HashMap.Entry<String, Channel> entry : manager.entrySet()) {
if (entry.getValue().id().asLongText().equals(channelId)) {
for (HashMap.Entry<String, ChannelHandlerContext> entry : manager.entrySet()) {
if (entry.getValue().channel().id().asLongText().equals(channelId)) {
return entry.getKey();
}
}
@@ -75,9 +76,9 @@ public class PileChannelEntity {
* 打印
*/
public static void output() {
for (HashMap.Entry<String, Channel> entry : manager.entrySet()) {
for (HashMap.Entry<String, ChannelHandlerContext> entry : manager.entrySet()) {
System.out.println("pileSn:" + entry.getKey() +
",ChannelId:" + entry.getValue().id().asLongText());
",ChannelId:" + entry.getValue().channel().id().asLongText());
}
}

View File

@@ -10,6 +10,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.CRC16Util;
import com.jsowell.common.util.DateUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@@ -66,23 +67,23 @@ public abstract class AbstractHandler implements InitializingBean {
* 保存桩最后链接到平台的时间
* @param pileSn 桩编号
*/
protected void saveLastTimeAndCheckChannel(String pileSn, Channel channel) {
protected void saveLastTimeAndCheckChannel(String pileSn, ChannelHandlerContext ctx) {
String redisKey = CacheConstants.PILE_LAST_CONNECTION + pileSn;
redisCache.setCacheObject(redisKey, DateUtils.getDateTime(), CacheConstants.cache_expire_time_1d);
// 保存桩号和channel的关系
PileChannelEntity.checkChannel(pileSn, channel);
PileChannelEntity.checkChannel(pileSn, ctx);
}
/**
* 阻止重复帧
* @return true 重复
*/
protected boolean verifyTheDuplicateRequest(YKCDataProtocol ykcDataProtocol, Channel channel) {
protected boolean verifyTheDuplicateRequest(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
// 获取序列号域
int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber());
// 获取channelId
String channelId = channel.id().asShortText();
String channelId = ctx.channel().id().asShortText();
String redisKey = "Request_" + channelId + "_" + serialNumber;
Boolean result = redisCache.setnx(redisKey, ykcDataProtocol.getHEXString(), 30);
// result返回false说明没有设置成功就是说已经有相同请求了所以返回true重复

View File

@@ -77,20 +77,12 @@ public abstract class AbstractHandler implements InitializingBean {
* 保存桩最后链接到平台的时间
* @param pileSn 桩编号
*/
protected void saveLastTimeAndCheckChannel(String pileSn, Channel channel) {
String redisKey = CacheConstants.PILE_LAST_CONNECTION + pileSn;
redisCache.setCacheObject(redisKey, DateUtils.getDateTime(), CacheConstants.cache_expire_time_1d);
// 保存桩号和channel的关系
PileChannelEntity.checkChannel(pileSn, channel);
}
protected void saveLastTimeAndCheckChannel(String pileSn, ChannelHandlerContext ctx) {
String redisKey = CacheConstants.PILE_LAST_CONNECTION + pileSn;
redisCache.setCacheObject(redisKey, DateUtils.getDateTime(), CacheConstants.cache_expire_time_1d);
// 保存桩号和channel的关系
PileChannelEntity.checkChannel(pileSn, ctx.channel());
PileChannelEntity.checkChannel(pileSn, ctx);
}
/**

View File

@@ -56,7 +56,7 @@ public class YKCBusinessServiceImpl2 implements YKCBusinessService {
if (StringUtils.isBlank(pileSn)) {
return;
}
log.info("充电桩退出:{}, channelId:{}", pileSn, PileChannelEntity.getChannelByPileSn(pileSn).id());
log.info("充电桩退出:{}, channelId:{}", pileSn, PileChannelEntity.getChannelByPileSn(pileSn).channel().id());
// 充电桩断开连接,所有枪口都设置为【离线】
pileConnectorInfoService.updateConnectorStatusByPileSn(pileSn, PileConnectorDataBaseStatusEnum.OFF_NETWORK.getValue());

View File

@@ -57,7 +57,7 @@ public class YKCBusinessServiceImpl implements YKCBusinessService {
if (StringUtils.isBlank(pileSn)) {
return;
}
log.info("充电桩退出:{}, channelId:{}", pileSn, PileChannelEntity.getChannelByPileSn(pileSn).id());
log.info("充电桩退出:{}, channelId:{}", pileSn, PileChannelEntity.getChannelByPileSn(pileSn).channel().id());
// 充电桩断开连接,所有枪口都设置为【离线】
pileConnectorInfoService.updateConnectorStatusByPileSn(pileSn, PileConnectorDataBaseStatusEnum.OFF_NETWORK.getValue());

View File

@@ -18,6 +18,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -71,9 +72,9 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
*/
public boolean push(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode) {
// 通过桩编号获取channel
Channel channel = PileChannelEntity.getChannelByPileSn(pileSn);
ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
String value = ((YKCFrameTypeCode) frameTypeCode).getValue();
if (Objects.isNull(channel)) {
if (Objects.isNull(ctx)) {
log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn);
return false;
}
@@ -106,18 +107,18 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
// 返回完整的报文 string类型
String wholeMsg = BytesUtil.binary(writeMsg, 16);
ByteBuf byteBuf = channel.alloc().buffer().writeBytes(writeMsg);
ChannelFuture channelFuture = channel.writeAndFlush(byteBuf);
ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg);
ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf);
channelFuture.addListener((ChannelFutureListener) channelFutureListener -> {
// 检查操作的状态
if (channelFutureListener.isSuccess()) {
log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
pileSn, channel.remoteAddress(), channel.id(), value, wholeMsg);
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
} else {
// 如果发生错误则访问描述原因的Throwable
Throwable cause = channelFutureListener.cause();
log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
pileSn, channel.remoteAddress(), channel.id(), value, wholeMsg);
pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
log.error("push发送命令失败, pileSn:{}", pileSn, cause);
}
});