This commit is contained in:
Lemon
2024-08-01 14:56:51 +08:00
70 changed files with 162 additions and 106 deletions

View File

@@ -10,6 +10,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.CRC16Util;
import com.jsowell.common.util.DateUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@@ -25,7 +26,7 @@ public abstract class AbstractHandler implements InitializingBean {
* 执行逻辑
* 有应答
*/
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
throw new UnsupportedOperationException();
}
@@ -66,23 +67,23 @@ public abstract class AbstractHandler implements InitializingBean {
* 保存桩最后链接到平台的时间
* @param pileSn 桩编号
*/
protected void saveLastTimeAndCheckChannel(String pileSn, Channel channel) {
protected void saveLastTimeAndCheckChannel(String pileSn, ChannelHandlerContext ctx) {
String redisKey = CacheConstants.PILE_LAST_CONNECTION + pileSn;
redisCache.setCacheObject(redisKey, DateUtils.getDateTime(), CacheConstants.cache_expire_time_1d);
// 保存桩号和channel的关系
PileChannelEntity.checkChannel(pileSn, channel);
PileChannelEntity.checkChannel(pileSn, ctx);
}
/**
* 阻止重复帧
* @return true 重复
*/
protected boolean verifyTheDuplicateRequest(YKCDataProtocol ykcDataProtocol, Channel channel) {
protected boolean verifyTheDuplicateRequest(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
// 获取序列号域
int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber());
// 获取channelId
String channelId = channel.id().asShortText();
String channelId = ctx.channel().id().asShortText();
String redisKey = "Request_" + channelId + "_" + serialNumber;
Boolean result = redisCache.setnx(redisKey, ykcDataProtocol.getHEXString(), 30);
// result返回false说明没有设置成功就是说已经有相同请求了所以返回true重复

View File

@@ -10,6 +10,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.CRC16Util;
import com.jsowell.common.util.DateUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@@ -29,6 +30,10 @@ public abstract class AbstractHandler implements InitializingBean {
throw new UnsupportedOperationException();
}
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
throw new UnsupportedOperationException();
}
/**
* 执行逻辑
* 不需要应答
@@ -72,23 +77,23 @@ public abstract class AbstractHandler implements InitializingBean {
* 保存桩最后链接到平台的时间
* @param pileSn 桩编号
*/
protected void saveLastTimeAndCheckChannel(String pileSn, Channel channel) {
protected void saveLastTimeAndCheckChannel(String pileSn, ChannelHandlerContext ctx) {
String redisKey = CacheConstants.PILE_LAST_CONNECTION + pileSn;
redisCache.setCacheObject(redisKey, DateUtils.getDateTime(), CacheConstants.cache_expire_time_1d);
// 保存桩号和channel的关系
PileChannelEntity.checkChannel(pileSn, channel);
PileChannelEntity.checkChannel(pileSn, ctx);
}
/**
* 阻止重复帧
* @return true 重复
*/
protected boolean verifyTheDuplicateRequest(YKCDataProtocol ykcDataProtocol, Channel channel) {
protected boolean verifyTheDuplicateRequest(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
// 获取序列号域
int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber());
// 获取channelId
String channelId = channel.id().asShortText();
String channelId = ctx.channel().id().asShortText();
String redisKey = "Request_" + channelId + "_" + serialNumber;
Boolean result = redisCache.setnx(redisKey, ykcDataProtocol.getHEXString(), 30);
// result返回false说明没有设置成功就是说已经有相同请求了所以返回true重复

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class BMSAbortDuringChargingPhaseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电阶段 BMS 中止===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class BMSDemandAndChargerOutputHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电过程 BMS 需求与充电机输出===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class BMSInformationHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电过程 BMS 信息===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -9,6 +9,7 @@ import com.jsowell.pile.service.PileBillingTemplateService;
import com.jsowell.pile.service.YKCPushCommandService;
import com.jsowell.pile.vo.web.BillingTemplateVO;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -37,7 +38,7 @@ public class BillingTemplateRequestHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
// log.info("[===执行计费模板请求逻辑===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体(此请求消息体只有桩编码)
byte[] pileSnByte = ykcDataProtocol.getMsgBody();
@@ -45,7 +46,7 @@ public class BillingTemplateRequestHandler extends AbstractHandler{
// log.info("桩号:{}", pileSn);
// 保存时间
saveLastTimeAndCheckChannel(pileSn, channel);
saveLastTimeAndCheckChannel(pileSn, ctx);
// 根据桩号查询计费模板
BillingTemplateVO billingTemplateVO = pileBillingTemplateService.selectBillingTemplateDetailByPileSn(pileSn);

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class BillingTemplateResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
// log.info("[===执行计费模型设置应答逻辑===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();
@@ -36,7 +37,7 @@ public class BillingTemplateResponseHandler extends AbstractHandler{
String pileSn = BytesUtil.bcd2Str(pileSnByteArr);
// 保存时间
saveLastTimeAndCheckChannel(pileSn, channel);
saveLastTimeAndCheckChannel(pileSn, ctx);
// 设置结果 0x00 失败 0x01 成功
byte[] settingResultByteArr = BytesUtil.copyBytes(msgBody, 7, 1);

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class BillingTemplateSettingHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
// log.info("[===执行计费模型设置逻辑===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编号

View File

@@ -10,6 +10,7 @@ import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.service.PileBillingTemplateService;
import com.jsowell.pile.service.YKCPushCommandService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -38,7 +39,7 @@ public class BillingTemplateValidateRequestHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===执行计费模板验证请求逻辑===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -9,6 +9,7 @@ import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.domain.OrderBasicInfo;
import com.jsowell.pile.service.OrderBasicInfoService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -37,7 +38,7 @@ public class ChargeEndHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===执行充电结束逻辑===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class ChargerAbortedDuringChargingPhaseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电阶段充电机中止===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class ChargingHandshakeHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===执行充电握手逻辑===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -22,6 +22,7 @@ import com.jsowell.pile.service.OrderBasicInfoService;
import com.jsowell.pile.service.PileAuthCardService;
import com.jsowell.pile.service.PileMsgRecordService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -58,7 +59,7 @@ public class ConfirmStartChargingRequestHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电桩主动申请启动充电===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class ErrorMessageHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===错误报文===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -19,6 +19,7 @@ import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.domain.OrderPileOccupy;
import com.jsowell.pile.service.OrderPileOccupyService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -51,7 +52,7 @@ public class GroundLockDataUploadHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===地锁数据上送===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -9,6 +9,7 @@ import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.service.PileBasicInfoService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -31,7 +32,7 @@ public class HeartbeatRequestHandler extends AbstractHandler {
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电桩心跳包===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -18,6 +18,7 @@ import com.jsowell.pile.service.PileBasicInfoService;
import com.jsowell.pile.service.PileMsgRecordService;
import com.jsowell.pile.service.YKCPushCommandService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -105,7 +106,7 @@ public class LoginRequestHandler extends AbstractHandler {
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();
@@ -117,7 +118,7 @@ public class LoginRequestHandler extends AbstractHandler {
String pileSn = BytesUtil.binary(pileSnByte, 16);
// 保存时间
saveLastTimeAndCheckChannel(pileSn, channel);
saveLastTimeAndCheckChannel(pileSn, ctx);
// 桩类型 0 表示直流桩, 1 表示交流桩
startIndex += length;
@@ -163,7 +164,7 @@ public class LoginRequestHandler extends AbstractHandler {
String business = BytesUtil.bcd2Str(businessTypeByteArr);
// *********************** 字段解析完成,下面进行逻辑处理 *********************** //
if (verifyTheDuplicateRequest(ykcDataProtocol, channel)) {
if (verifyTheDuplicateRequest(ykcDataProtocol, ctx)) {
return null;
}

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class OfflineCardDataCleaningHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===离线卡数据清除===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编号

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class OfflineCardDataCleaningResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===离线卡数据清除应答===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ public class OfflineCardDataQueryHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===离线卡数据查询===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编码

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class OfflineCardDataQueryResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===离线卡数据查询应答===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ public class OfflineCardDataSynchronizationHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===离线卡数据同步===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编号

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class OfflineCardDataSynchronizationResponseHandler extends AbstractHandl
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===离线卡数据同步应答===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
//消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -8,6 +8,7 @@ import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.domain.OrderBasicInfo;
import com.jsowell.pile.service.OrderBasicInfoService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -35,7 +36,7 @@ public class ParameterConfigurationHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===参数配置===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ public class PileWorkingParameterSettingHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电桩工作参数设置===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编号

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class PileWorkingParameterSettingResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电桩工作参数设置应答===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class QueryPileWorkParamsHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[====充电桩査询工作参数回复====] param:{}", JSON.toJSONString(ykcDataProtocol));
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -9,6 +9,7 @@ import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.service.impl.YKCPushCommandServiceImpl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -32,7 +33,7 @@ public class ReadRealTimeMonitorDataHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===读取实时监测数据===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编号

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class RemoteAccountBalanceUpdateRequestHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===余额更新应答===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class RemoteControlGroundLockHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===遥控地锁升锁与降锁命令===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编码

View File

@@ -7,6 +7,7 @@ import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.service.OrderPileOccupyService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -34,7 +35,7 @@ public class RemoteControlGroundLockResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===充电桩返回遥控地锁升锁与降锁数据(上行)===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -8,6 +8,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -33,7 +34,7 @@ public class RemoteIssuedQrCodeHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===后台远程下发二维码前缀指令===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编码

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class RemoteIssuedQrCodeResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===桩应答远程下发二维码前缀指令===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ public class RemoteRestartHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===远程重启===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编号

View File

@@ -8,6 +8,7 @@ import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.service.PileMsgRecordService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -32,7 +33,7 @@ public class RemoteRestartResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===远程重启应答===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -12,6 +12,7 @@ import com.jsowell.pile.domain.OrderBasicInfo;
import com.jsowell.pile.service.OrderBasicInfoService;
import com.jsowell.thirdparty.common.CommonService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -41,7 +42,7 @@ public class RemoteStartChargingRequestHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===远程启动充电命令回复===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -11,6 +11,7 @@ import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.domain.OrderBasicInfo;
import com.jsowell.pile.service.OrderBasicInfoService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -37,7 +38,7 @@ public class RemoteStopChargingRequestHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===远程停机命令回复===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -28,7 +29,7 @@ public class RemoteUpdateHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===远程更新===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编号

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class RemoteUpdateResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[====远程更新应答====] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -24,7 +25,7 @@ public class ReservationChargingResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[====远程更新应答====] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -11,6 +11,7 @@ import com.jsowell.netty.factory.YKCOperateFactory;
import com.jsowell.pile.dto.ReservationChargingStartupResult;
import com.jsowell.pile.service.PileBasicInfoService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -33,7 +34,7 @@ public class ReservationChargingStartupResultHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
log.info("[===预约充电启动结果上送===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -6,6 +6,7 @@ import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -26,7 +27,7 @@ public class SettingPileWorkParamsHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[====平台设置工作参数回复====] param:{}", JSON.toJSONString(ykcDataProtocol));
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -5,6 +5,7 @@ import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ public class TimeCheckSettingHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===对时设置===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 下发
// 桩编号

View File

@@ -8,6 +8,7 @@ import com.jsowell.common.util.DateUtils;
import com.jsowell.common.util.YKCUtils;
import com.jsowell.netty.factory.YKCOperateFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -30,7 +31,7 @@ public class TimeCheckSettingResponseHandler extends AbstractHandler{
}
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===对时设置应答===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();
@@ -50,7 +51,7 @@ public class TimeCheckSettingResponseHandler extends AbstractHandler{
length = 7;
byte[] currentTimeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
Date date = Cp56Time2aUtil.byte2Hdate(currentTimeByteArr);
log.info("对时设置应答, pileSn:{}, channelId:{}, 充电桩当前时间:{}", pileSn, channel.id().asShortText(), DateUtils.formatDateTime(date));
log.info("对时设置应答, pileSn:{}, channelId:{}, 充电桩当前时间:{}", pileSn, channel.channel().id().asShortText(), DateUtils.formatDateTime(date));
return null;
}
}

View File

@@ -23,6 +23,7 @@ import com.jsowell.pile.service.programlogic.AbstractProgramLogic;
import com.jsowell.pile.service.programlogic.ProgramLogicFactory;
import com.jsowell.thirdparty.common.CommonService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -321,7 +322,7 @@ public class TransactionRecordsRequestHandler extends AbstractHandler {
}*/
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===交易记录===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
// 获取消息体
byte[] msgBody = ykcDataProtocol.getMsgBody();

View File

@@ -18,6 +18,7 @@ import com.jsowell.pile.service.PileBasicInfoService;
import com.jsowell.pile.service.OrderBasicInfoService;
import com.jsowell.thirdparty.common.CommonService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -59,7 +60,7 @@ public class UploadRealTimeMonitorHandler extends AbstractHandler {
@Override
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, Channel channel) {
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
// log.info("[===获取桩上传的实时监测数据===] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
RealTimeMonitorData realTimeMonitorData = new RealTimeMonitorData();

View File

@@ -0,0 +1,19 @@
package com.jsowell.netty.rpc;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MessageEncode());
pipeline.addLast(new MessageDecode());
pipeline.addLast(new RpcResponseHandler());
}
}

View File

@@ -0,0 +1,11 @@
package com.jsowell.netty.rpc;
import lombok.Data;
@Data
public abstract class Message {
protected Byte messageType;
}

View File

@@ -0,0 +1,22 @@
package com.jsowell.netty.rpc;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MessageConstant {
public final static Byte rpcRequest = 1;
public final static Byte rpcResponse = 2;
public static Map<Byte, Class<? extends Message>> messageTypeMap = new ConcurrentHashMap<>();
static {
messageTypeMap.put(rpcRequest, RpcRequest.class);
messageTypeMap.put(rpcResponse, RpcResponse.class);
}
public static Class<? extends Message> getMessageClass(Byte messageType){
return messageTypeMap.get(messageType);
}
}

View File

@@ -0,0 +1,45 @@
package com.jsowell.netty.rpc;
import com.jsowell.common.util.bean.SerializationUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MessageDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 由于数据包的前4个字节用于记录总数据大小如果数据不够4个字节不进行读
if(byteBuf.readableBytes() < 4) {
return;
}
// 标记开始读的位置
byteBuf.markReaderIndex();
// 前四个字节记录了数据大小
int dataSize = byteBuf.readInt();
// 查看剩余可读字节是否足够,如果不是,重置读取位置,等待下一次解析
if(byteBuf.readableBytes() < dataSize) {
byteBuf.resetReaderIndex();
return;
}
// 读取消息类型
byte messageType = byteBuf.readByte();
// 读取数据, 数组大小需要剔除1个字节的消息类型
byte[] data = new byte[dataSize -1];
byteBuf.readBytes(data);
Message message = SerializationUtil.deserialize(MessageConstant.getMessageClass(messageType), data);
list.add(message);
}
}

View File

@@ -0,0 +1,23 @@
package com.jsowell.netty.rpc;
import com.jsowell.common.util.bean.SerializationUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MessageEncode extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
// 将对象进行序列化
byte[] data = SerializationUtil.serialize(message);
// 写数据长度前4个字节用于记录数据总长度对象 + 类型1个字节
byteBuf.writeInt(data.length + 1);
// 写记录消息类型,用于反序列选择类的类型
byteBuf.writeByte(message.getMessageType());
// 写对象
byteBuf.writeBytes(data);
}
}

View File

@@ -0,0 +1,52 @@
package com.jsowell.netty.rpc;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class RpcClient {
public Channel connect(String host, Integer port) {
EventLoopGroup worker = new NioEventLoopGroup();
Channel channel = null;
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.AUTO_READ, true)
.handler(new ClientChannelInitializer());
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
System.out.println("客户端启动");
channel = channelFuture.channel();
// 添加关闭监听器
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("关闭客户端");
worker.shutdownGracefully();
}
});
} catch (Exception e) {
e.printStackTrace();
if(channel == null || !channel.isActive()) {
worker.shutdownGracefully();
} else {
channel.close();
}
}
return channel;
}
}

View File

@@ -0,0 +1,21 @@
package com.jsowell.netty.rpc;
import lombok.Data;
import lombok.ToString;
import java.util.UUID;
@Data
@ToString
public class RpcRequest extends Message{
private String id;
private String param;
public RpcRequest() {
this.id = UUID.randomUUID().toString();
super.messageType = MessageConstant.rpcRequest;
}
}

View File

@@ -0,0 +1,36 @@
package com.jsowell.netty.rpc;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> {
private final static EventLoopGroup worker = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1);
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
// 为避免占用网络io此处异步进行处理
worker.submit(() -> {
System.out.println("[RpcRequestHandler] "+ Thread.currentThread().getName() +" 处理请求msg " + msg);
// 模拟处理耗时
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setId(msg.getId());
rpcResponse.setResult("处理" + msg.getParam());
ctx.writeAndFlush(rpcResponse);
});
}
}

View File

@@ -0,0 +1,18 @@
package com.jsowell.netty.rpc;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class RpcResponse extends Message{
private String id;
private String result;
public RpcResponse() {
super.messageType = MessageConstant.rpcResponse;
}
}

View File

@@ -0,0 +1,23 @@
package com.jsowell.netty.rpc;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
// 根据请求id在集合中找到与外部线程通信的SyncPromise对象
SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msg.getId());
if(syncPromise != null) {
// 设置响应结果
syncPromise.setRpcResponse(msg);
// 唤醒外部线程
syncPromise.wake();
}
}
}

View File

@@ -0,0 +1,55 @@
package com.jsowell.netty.rpc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class RpcServer {
public void bind(Integer port) {
EventLoopGroup parent = new NioEventLoopGroup();
EventLoopGroup child = new NioEventLoopGroup();
Channel channel = null;
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parent, child)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerChannelInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
System.out.println("server启动");
// 非阻塞等待关闭
channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("server关闭");
parent.shutdownGracefully();
child.shutdownGracefully();
}
});
channel = channelFuture.channel();
} catch (Exception e) {
e.printStackTrace();
if(channel == null || !channel.isActive()) {
System.out.println("server关闭");
parent.shutdownGracefully();
child.shutdownGracefully();
} else {
channel.close();
}
}
}
}

View File

@@ -0,0 +1,63 @@
package com.jsowell.netty.rpc;
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RpcUtil {
private final static Map<String, SyncPromise> syncPromiseMap = new ConcurrentHashMap<>();
private final static Channel channel;
static{
channel = new RpcClient().connect("127.0.0.1", 8888);
}
public static RpcResponse send(RpcRequest rpcRequest, long timeout, TimeUnit unit) throws Exception{
if(channel == null) {
throw new NullPointerException("channel");
}
if(rpcRequest == null) {
throw new NullPointerException("rpcRequest");
}
if(timeout <= 0) {
throw new IllegalArgumentException("timeout must greater than 0");
}
// 创造一个容器用于存放当前线程与rpcClient中的线程交互
SyncPromise syncPromise = new SyncPromise();
syncPromiseMap.put(rpcRequest.getId(), syncPromise);
// 发送消息此处如果发送玩消息并且在get之前返回了结果下一行的get将不会进入阻塞也可以顺利拿到结果
channel.writeAndFlush(rpcRequest);
// 等待获取结果
RpcResponse rpcResponse = syncPromise.get(timeout, unit);
if(rpcResponse == null) {
if(syncPromise.isTimeout()) {
throw new TimeoutException("等待响应结果超时");
} else{
throw new Exception("其他异常");
}
}
// 移除容器
syncPromiseMap.remove(rpcRequest.getId());
return rpcResponse;
}
public static Map<String, SyncPromise> getSyncPromiseMap(){
return syncPromiseMap;
}
}

View File

@@ -0,0 +1,19 @@
package com.jsowell.netty.rpc;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MessageEncode());
pipeline.addLast(new MessageDecode());
pipeline.addLast(new RpcRequestHandler());
}
}

View File

@@ -0,0 +1,49 @@
package com.jsowell.netty.rpc;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class SyncPromise {
// 用于接收结果
private RpcResponse rpcResponse;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
// 用于判断是否超时
private boolean isTimeout = false;
/**
* 同步等待返回结果
*/
public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException {
// 等待阻塞超时时间内countDownLatch减到0将提前唤醒以此作为是否超时判断
boolean earlyWakeUp = countDownLatch.await(timeout, unit);
if(earlyWakeUp) {
// 超时时间内countDownLatch减到0提前唤醒说明已有结果
return rpcResponse;
} else {
// 超时时间内countDownLatch没有减到0自动唤醒说明超时时间内没有等到结果
isTimeout = true;
return null;
}
}
public void wake() {
countDownLatch.countDown();
}
public RpcResponse getRpcResponse() {
return rpcResponse;
}
public void setRpcResponse(RpcResponse rpcResponse) {
this.rpcResponse = rpcResponse;
}
public boolean isTimeout() {
return isTimeout;
}
}

View File

@@ -0,0 +1,48 @@
package com.jsowell.netty.rpc;
import java.util.concurrent.TimeUnit;
public class TestRpcClient {
public static void main(String[] args) throws Exception{
//Channel channel = new RpcClient().connect("127.0.0.1", 8888);
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setParam("参数1");
try {
System.out.println("thread1发送请求");
RpcResponse rpcResponse = RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS);
System.out.println("thread1处理结果" + rpcResponse);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
RpcRequest rpcRequest2 = new RpcRequest();
rpcRequest2.setParam("参数2");
try {
System.out.println("thread2发送请求");
RpcResponse rpcResponse = RpcUtil.send(rpcRequest2, 2, TimeUnit.SECONDS);
System.out.println("thread2处理结果" + rpcResponse);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
// 休眠一下,等待客户端与服务端进行连接
Thread.sleep(1000);
thread1.start();
thread2.start();
}
}

View File

@@ -0,0 +1,7 @@
package com.jsowell.netty.rpc;
public class TestRpcServer {
public static void main(String[] args) {
new RpcServer().bind(8888);
}
}

View File

@@ -92,7 +92,7 @@ public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter
}
// 处理数据
byte[] response = ykcService.process(msg, channel);
byte[] response = ykcService.process(msg, ctx);
if (Objects.nonNull(response)) {
// 响应客户端
ByteBuf buffer = ctx.alloc().buffer().writeBytes(response);

View File

@@ -92,7 +92,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler {
}
// 处理数据
byte[] response = ykcService.process(msg, channel);
byte[] response = ykcService.process(msg, ctx);
if (Objects.nonNull(response)) {
// 响应客户端
ByteBuf buffer = ctx.alloc().buffer().writeBytes(response);

View File

@@ -1,6 +1,7 @@
package com.jsowell.netty.service.electricbicycles;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
/**
@@ -12,10 +13,10 @@ public interface YKCBusinessService {
* 处理桩发来的请求
* 不需要应答的返回null
* @param msg 请求报文
* @param channel 通道信息
* @param ctx 通道信息
* @return 结果
*/
byte[] process(byte[] msg, Channel channel);
byte[] process(byte[] msg, ChannelHandlerContext ctx);
/**
* 桩退出

View File

@@ -14,6 +14,7 @@ import com.jsowell.pile.service.PileConnectorInfoService;
import com.jsowell.pile.service.PileMsgRecordService;
import com.jsowell.pile.service.YKCPushCommandService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -36,7 +37,7 @@ public class YKCBusinessServiceImpl2 implements YKCBusinessService {
private YKCPushCommandService ykcPushCommandService;
@Override
public byte[] process(byte[] msg, Channel channel) {
public byte[] process(byte[] msg, ChannelHandlerContext ctx) {
if (!YKCUtils.checkMsg(msg)) {
// 校验不通过,丢弃消息
return null;
@@ -46,7 +47,7 @@ public class YKCBusinessServiceImpl2 implements YKCBusinessService {
String frameType = YKCUtils.frameType2Str(ykcDataProtocol.getFrameType());
// 获取业务处理handler
AbstractHandler invokeStrategy = YKCOperateFactory.getInvokeStrategy(frameType);
return invokeStrategy.supplyProcess(ykcDataProtocol, channel);
return invokeStrategy.supplyProcess(ykcDataProtocol, ctx);
}
@Override
@@ -56,7 +57,7 @@ public class YKCBusinessServiceImpl2 implements YKCBusinessService {
if (StringUtils.isBlank(pileSn)) {
return;
}
log.info("充电桩退出:{}, channelId:{}", pileSn, PileChannelEntity.getChannelByPileSn(pileSn).id());
log.info("充电桩退出:{}, channelId:{}", pileSn, PileChannelEntity.getChannelByPileSn(pileSn).channel().id());
// 充电桩断开连接,所有枪口都设置为【离线】
pileConnectorInfoService.updateConnectorStatusByPileSn(pileSn, PileConnectorDataBaseStatusEnum.OFF_NETWORK.getValue());

View File

@@ -1,6 +1,7 @@
package com.jsowell.netty.service.yunkuaichong;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
/**
@@ -12,10 +13,10 @@ public interface YKCBusinessService {
* 处理桩发来的请求
* 不需要应答的返回null
* @param msg 请求报文
* @param channel 通道信息
* @param ctx 通道信息
* @return 结果
*/
byte[] process(byte[] msg, Channel channel);
byte[] process(byte[] msg, ChannelHandlerContext ctx);
/**
* 桩退出

View File

@@ -14,6 +14,7 @@ import com.jsowell.pile.service.PileMsgRecordService;
import com.jsowell.pile.service.OrderBasicInfoService;
import com.jsowell.pile.service.YKCPushCommandService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -36,7 +37,7 @@ public class YKCBusinessServiceImpl implements YKCBusinessService {
private YKCPushCommandService ykcPushCommandService;
@Override
public byte[] process(byte[] msg, Channel channel) {
public byte[] process(byte[] msg, ChannelHandlerContext ctx) {
if (!YKCUtils.checkMsg(msg)) {
// 校验不通过,丢弃消息
return null;
@@ -46,7 +47,7 @@ public class YKCBusinessServiceImpl implements YKCBusinessService {
String frameType = YKCUtils.frameType2Str(ykcDataProtocol.getFrameType());
// 获取业务处理handler
AbstractHandler invokeStrategy = YKCOperateFactory.getInvokeStrategy(frameType);
return invokeStrategy.supplyProcess(ykcDataProtocol, channel);
return invokeStrategy.supplyProcess(ykcDataProtocol, ctx);
}
@Override
@@ -56,7 +57,7 @@ public class YKCBusinessServiceImpl implements YKCBusinessService {
if (StringUtils.isBlank(pileSn)) {
return;
}
log.info("充电桩退出:{}, channelId:{}", pileSn, PileChannelEntity.getChannelByPileSn(pileSn).id());
log.info("充电桩退出:{}, channelId:{}", pileSn, PileChannelEntity.getChannelByPileSn(pileSn).channel().id());
// 充电桩断开连接,所有枪口都设置为【离线】
pileConnectorInfoService.updateConnectorStatusByPileSn(pileSn, PileConnectorDataBaseStatusEnum.OFF_NETWORK.getValue());