mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-05-05 18:40:14 +08:00
Merge branch 'dev-new' into dev-new-rabbitmq
This commit is contained in:
@@ -55,7 +55,7 @@ public class YKCUtils {
|
||||
|
||||
// 如果是0x03帧,则不进行crc校验
|
||||
if (0x03 == frameType[0]) {
|
||||
log.info("0x03帧,则不进行crc校验");
|
||||
log.debug("0x03帧,则不进行crc校验");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ public class YKCUtils {
|
||||
|
||||
// 如果是0x03帧,则不进行crc校验
|
||||
if (0x03 == frameType[0]) {
|
||||
log.info("0x03帧,则不进行crc校验");
|
||||
log.debug("0x03帧,则不进行crc校验");
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,9 @@ import com.jsowell.common.constant.CacheConstants;
|
||||
import com.jsowell.common.core.domain.ykc.YKCDataProtocol;
|
||||
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
|
||||
import com.jsowell.common.core.redis.RedisCache;
|
||||
import com.jsowell.common.protocol.SyncPromise;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.RpcUtil;
|
||||
import com.jsowell.common.util.YKCUtils;
|
||||
import com.jsowell.netty.factory.YKCOperateFactory;
|
||||
import com.jsowell.pile.domain.PileReservationInfo;
|
||||
@@ -36,7 +38,7 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler {
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) {
|
||||
public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) {
|
||||
// log.info("[====远程更新应答====] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString());
|
||||
// 消息体
|
||||
byte[] msgBody = ykcDataProtocol.getMsgBody();
|
||||
@@ -55,7 +57,7 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler {
|
||||
String pileSn = BytesUtil.bcd2Str(pileSnByteArr);
|
||||
|
||||
// 保存时间
|
||||
saveLastTimeAndCheckChannel(pileSn, channel);
|
||||
saveLastTimeAndCheckChannel(pileSn, ctx);
|
||||
|
||||
// 枪口号
|
||||
startIndex += length;
|
||||
@@ -78,6 +80,18 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler {
|
||||
log.info("0x59预约充电响应, 交易流水号:{}, 桩SN:{}, 枪口号:{}, 结果:{}, 失败原因:{}",
|
||||
transactionCode, pileSn, connectorCode, resultCode, failedReason);
|
||||
|
||||
// 根据请求id,在集合中找到与外部线程通信的SyncPromise对象
|
||||
String msgId = ctx.channel().id().toString() + "_" + YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getCode();
|
||||
log.info("同步获取响应数据-收到消息, msgId:{}", msgId);
|
||||
SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId);
|
||||
if(syncPromise != null) {
|
||||
// 设置响应结果
|
||||
syncPromise.setRpcResult(ykcDataProtocol.getBytes());
|
||||
// 唤醒外部线程
|
||||
log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise));
|
||||
syncPromise.wake();
|
||||
}
|
||||
|
||||
// 如果收到成功, 从redis取值, 保存到数据库
|
||||
if ("01".equals(resultCode)) {
|
||||
// 预约成功, 删除redis中的预约信息
|
||||
|
||||
@@ -50,7 +50,7 @@ public class TimeCheckSettingResponseHandler extends AbstractYkcHandler {
|
||||
length = 7;
|
||||
byte[] currentTimeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
Date date = Cp56Time2aUtil.byte2Hdate(currentTimeByteArr);
|
||||
log.info("对时设置应答, pileSn:{}, channelId:{}, 充电桩当前时间:{}", pileSn, channel.channel().id().asShortText(), DateUtils.formatDateTime(date));
|
||||
log.info("[===对时设置充电桩应答===], pileSn:{}, channelId:{}, 充电桩当前时间:{}", pileSn, channel.channel().id().asShortText(), DateUtils.formatDateTime(date));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,9 +51,14 @@ public class NettyServerManager implements CommandLineRunner {
|
||||
.handler(new LoggingHandler(LogLevel.DEBUG))
|
||||
// .option(ChannelOption.SO_BACKLOG, 128) // 默认128
|
||||
.option(ChannelOption.SO_BACKLOG, 1024)
|
||||
.option(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
// .option(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持连接
|
||||
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle 算法
|
||||
.childOption(ChannelOption.SO_RCVBUF, 64 * 1024) // 接收缓冲区
|
||||
.childOption(ChannelOption.SO_SNDBUF, 64 * 1024) // 发送缓冲区
|
||||
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024)) // 写缓冲水位
|
||||
.childOption(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 启用池化内存分配器
|
||||
.childHandler(nettyServerChannelInitializer)
|
||||
.localAddress(new InetSocketAddress(host, port));
|
||||
|
||||
|
||||
@@ -26,8 +26,8 @@ public class NettyServerChannelInitializer extends ChannelInitializer<SocketChan
|
||||
// pipeline.addLast("frameDecoder", new YkcProtocolDecoder());
|
||||
pipeline.addLast("decoder", new ByteArrayDecoder());
|
||||
pipeline.addLast("encoder", new ByteArrayDecoder());
|
||||
//读超时时间设置为10s,0表示不监控
|
||||
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
|
||||
// 读超时时间设置为30s,0表示不监控
|
||||
pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
|
||||
pipeline.addLast("handler", nettyServerHandler);
|
||||
}
|
||||
|
||||
|
||||
@@ -7,8 +7,6 @@ import com.jsowell.common.enums.ykc.PileChannelEntity;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.StringUtils;
|
||||
import com.jsowell.common.util.YKCUtils;
|
||||
import com.jsowell.common.util.RpcUtil;
|
||||
import com.jsowell.common.protocol.SyncPromise;
|
||||
import com.jsowell.netty.service.yunkuaichong.YKCBusinessService;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.*;
|
||||
@@ -41,7 +39,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
||||
*/
|
||||
private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final List<String> notPrintFrameTypeList = Lists.newArrayList("0x03");
|
||||
private final List<String> notPrintFrameTypeList = Lists.newArrayList(); // "0x03"
|
||||
|
||||
/**
|
||||
* 有客户端连接服务器会触发此函数
|
||||
@@ -136,21 +134,21 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
||||
String frameType = YKCUtils.frameType2Str(frameTypeBytes);
|
||||
|
||||
// 判断该帧类型是否为某请求帧的应答帧
|
||||
String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType);
|
||||
// String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType);
|
||||
// log.info("同步获取响应数据-判断该帧类型是否为某请求帧的应答帧, frameType:{}, requestFrameType:{}", frameType, requestFrameType);
|
||||
if (StringUtils.isNotBlank(requestFrameType)) {
|
||||
// 根据请求id,在集合中找到与外部线程通信的SyncPromise对象
|
||||
String msgId = ctx.channel().id().toString() + "_" + requestFrameType;
|
||||
// log.info("同步获取响应数据-收到消息, msgId:{}", msgId);
|
||||
SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId);
|
||||
if(syncPromise != null) {
|
||||
// 设置响应结果
|
||||
syncPromise.setRpcResult(ykcDataProtocol.getBytes());
|
||||
// 唤醒外部线程
|
||||
// log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise));
|
||||
syncPromise.wake();
|
||||
}
|
||||
}
|
||||
// if (StringUtils.isNotBlank(requestFrameType)) {
|
||||
// // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象
|
||||
// String msgId = ctx.channel().id().toString() + "_" + requestFrameType;
|
||||
// // log.info("同步获取响应数据-收到消息, msgId:{}", msgId);
|
||||
// SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId);
|
||||
// if(syncPromise != null) {
|
||||
// // 设置响应结果
|
||||
// syncPromise.setRpcResult(ykcDataProtocol.getBytes());
|
||||
// // 唤醒外部线程
|
||||
// // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise));
|
||||
// syncPromise.wake();
|
||||
// }
|
||||
// }
|
||||
|
||||
// 获取序列号域
|
||||
int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber());
|
||||
|
||||
@@ -64,7 +64,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
/**
|
||||
* 发送消息,无返回值
|
||||
*/
|
||||
public void runSend(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode) throws Exception {
|
||||
public void runSend(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode) {
|
||||
// 通过桩编号获取channel
|
||||
ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
|
||||
String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); // 帧类型名称
|
||||
@@ -73,7 +73,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
throw new NullPointerException("channel");
|
||||
}
|
||||
if(msg == null) {
|
||||
throw new NullPointerException("msg");
|
||||
throw new NullPointerException("发送消息msg为空");
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -157,7 +157,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
throw new NullPointerException("channel");
|
||||
}
|
||||
if(msg == null) {
|
||||
throw new NullPointerException("msg");
|
||||
throw new NullPointerException("发送消息msg为空");
|
||||
}
|
||||
if(timeout <= 0) {
|
||||
throw new IllegalArgumentException("timeout must greater than 0");
|
||||
@@ -285,7 +285,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
|
||||
byte[] msgBody = Bytes.concat(orderIdByteArr, pileSnByteArr, connectorCodeByteArr, logicCardNumByteArr, physicsCardNumByteArr, accountBalanceByteArr);
|
||||
try {
|
||||
this.supplySend(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE);
|
||||
this.runSend(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -306,7 +306,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
// 远程停机
|
||||
byte[] msgBody = Bytes.concat(BytesUtil.str2Bcd(pileSn), BytesUtil.str2Bcd(connectorCode));
|
||||
try {
|
||||
this.supplySend(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE);
|
||||
this.runSend(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE);
|
||||
} catch (Exception e) {
|
||||
log.error("发送停止充电 error:", e);
|
||||
}
|
||||
@@ -319,7 +319,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
String connectorCode = command.getConnectorCode();
|
||||
byte[] msg = BytesUtil.str2Bcd(pileSn + connectorCode);
|
||||
try {
|
||||
this.supplySend(msg, pileSn, YKCFrameTypeCode.READ_REAL_TIME_MONITOR_DATA_CODE);
|
||||
this.runSend(msg, pileSn, YKCFrameTypeCode.READ_REAL_TIME_MONITOR_DATA_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -332,7 +332,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] msg = BytesUtil.str2Bcd(pileSn + Constants.ZERO_ONE);
|
||||
log.info("【=====平台下发指令=====】:重启充电桩:,{}", pileSn);
|
||||
try {
|
||||
this.supplySend(msg, pileSn, YKCFrameTypeCode.REMOTE_RESTART_CODE);
|
||||
this.runSend(msg, pileSn, YKCFrameTypeCode.REMOTE_RESTART_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -377,7 +377,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
|
||||
// push消息
|
||||
try {
|
||||
this.supplySend(msg, pileSn, YKCFrameTypeCode.REMOTE_ISSUE_QRCODE_CODE);
|
||||
this.runSend(msg, pileSn, YKCFrameTypeCode.REMOTE_ISSUE_QRCODE_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -403,7 +403,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, dateBytes);
|
||||
|
||||
try {
|
||||
this.supplySend(msg, pileSn, YKCFrameTypeCode.TIME_CHECK_SETTING_CODE);
|
||||
this.runSend(msg, pileSn, YKCFrameTypeCode.TIME_CHECK_SETTING_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -487,7 +487,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
updateServerPortByteArr, userNameByteArr, passwordByteArr, filePathByteArr, performTypeByteArr, overTimeByteArr);
|
||||
|
||||
try {
|
||||
this.supplySend(msgBody, pileModelInfoVO.getPileSn(), YKCFrameTypeCode.REMOTE_UPDATE_CODE);
|
||||
this.runSend(msgBody, pileModelInfoVO.getPileSn(), YKCFrameTypeCode.REMOTE_UPDATE_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -521,7 +521,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, workingStateByteArr, maxPowerByteArr);
|
||||
|
||||
try {
|
||||
this.supplySend(msg, pileSn, YKCFrameTypeCode.CHARGING_PILE_WORKING_PARAMETER_SETTING_CODE);
|
||||
this.runSend(msg, pileSn, YKCFrameTypeCode.CHARGING_PILE_WORKING_PARAMETER_SETTING_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -552,7 +552,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, pileType);
|
||||
|
||||
try {
|
||||
this.supplySend(msg, pileSn, YKCFrameTypeCode.QUERY_PILE_WORK_PARAMS_CODE);
|
||||
this.runSend(msg, pileSn, YKCFrameTypeCode.QUERY_PILE_WORK_PARAMS_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -582,7 +582,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, connectorCodeByteArr, logicByteArr, priceByte);
|
||||
|
||||
try {
|
||||
this.supplySend(msg, pileSn, YKCFrameTypeCode.REMOTE_ACCOUNT_BALANCE_UPDATE_CODE);
|
||||
this.runSend(msg, pileSn, YKCFrameTypeCode.REMOTE_ACCOUNT_BALANCE_UPDATE_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -619,7 +619,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, connectorCodeByteArr, operateByteArr, obligateByteArr);
|
||||
|
||||
try {
|
||||
this.supplySend(msg, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_CODE);
|
||||
this.runSend(msg, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@@ -58,7 +58,8 @@ public class NotificationService {
|
||||
String postResult = platformService.notificationStationInfo(stationId);
|
||||
result.append(postResult).append("\n");
|
||||
} catch (Exception e) {
|
||||
logger.error("充电站信息变化推送error", e);
|
||||
logger.error("平台类型:{}, 平台名称:{}, 站点id:{}, 充电站信息变化推送error:{}",
|
||||
secretInfoVO.getPlatformType(), secretInfoVO.getPlatformName(), stationId, e.getMessage());
|
||||
}
|
||||
}
|
||||
return result.toString();
|
||||
@@ -93,7 +94,8 @@ public class NotificationService {
|
||||
ThirdPartyPlatformService platformService = ThirdPartyPlatformFactory.getInvokeStrategy(secretInfoVO.getPlatformType());
|
||||
platformService.notificationStationStatus(stationId, pileConnectorCode, status, secretInfoVO);
|
||||
} catch (Exception e) {
|
||||
logger.error("设备状态变化推送error", e);
|
||||
logger.error("平台类型:{}, 平台名称:{}, 站点id:{}, 枪口编号:{}, 设备状态变化推送error:{}",
|
||||
secretInfoVO.getPlatformType(), secretInfoVO.getPlatformName(), stationId, pileConnectorCode, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -125,9 +127,9 @@ public class NotificationService {
|
||||
ThirdPartyPlatformService platformService = ThirdPartyPlatformFactory.getInvokeStrategy(secretInfoVO.getPlatformType());
|
||||
try {
|
||||
platformService.notificationConnectorChargeStatus(orderCode, secretInfoVO);
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("设备充电中状态变化推送error:", e);
|
||||
logger.error("平台类型:{}, 平台名称:{}, 站点id:{}, 订单编号:{}, 设备充电中状态变化推送error:{}",
|
||||
secretInfoVO.getPlatformType(), secretInfoVO.getPlatformName(), stationId, orderCode, e.getMessage());
|
||||
}
|
||||
try {
|
||||
platformService.notificationEquipChargeStatus(orderCode);
|
||||
|
||||
Reference in New Issue
Block a user