diff --git a/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java b/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java index f96ee1580..7436462c9 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java +++ b/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java @@ -55,7 +55,7 @@ public class YKCUtils { // 如果是0x03帧,则不进行crc校验 if (0x03 == frameType[0]) { - log.info("0x03帧,则不进行crc校验"); + log.debug("0x03帧,则不进行crc校验"); return true; } @@ -109,7 +109,7 @@ public class YKCUtils { // 如果是0x03帧,则不进行crc校验 if (0x03 == frameType[0]) { - log.info("0x03帧,则不进行crc校验"); + log.debug("0x03帧,则不进行crc校验"); return true; } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java index 53ff0db12..0efc676d7 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java @@ -5,7 +5,9 @@ import com.jsowell.common.constant.CacheConstants; import com.jsowell.common.core.domain.ykc.YKCDataProtocol; import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; import com.jsowell.common.core.redis.RedisCache; +import com.jsowell.common.protocol.SyncPromise; import com.jsowell.common.util.BytesUtil; +import com.jsowell.common.util.RpcUtil; import com.jsowell.common.util.YKCUtils; import com.jsowell.netty.factory.YKCOperateFactory; import com.jsowell.pile.domain.PileReservationInfo; @@ -36,7 +38,7 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler { } @Override - public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext channel) { + public byte[] supplyProcess(YKCDataProtocol ykcDataProtocol, ChannelHandlerContext ctx) { // log.info("[====远程更新应答====] param:{}, channel:{}", JSON.toJSONString(ykcDataProtocol), channel.toString()); // 消息体 byte[] msgBody = ykcDataProtocol.getMsgBody(); @@ -55,7 +57,7 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler { String pileSn = BytesUtil.bcd2Str(pileSnByteArr); // 保存时间 - saveLastTimeAndCheckChannel(pileSn, channel); + saveLastTimeAndCheckChannel(pileSn, ctx); // 枪口号 startIndex += length; @@ -78,6 +80,18 @@ public class ReservationChargingResponseHandler extends AbstractYkcHandler { log.info("0x59预约充电响应, 交易流水号:{}, 桩SN:{}, 枪口号:{}, 结果:{}, 失败原因:{}", transactionCode, pileSn, connectorCode, resultCode, failedReason); + // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + String msgId = ctx.channel().id().toString() + "_" + YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getCode(); + log.info("同步获取响应数据-收到消息, msgId:{}", msgId); + SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); + if(syncPromise != null) { + // 设置响应结果 + syncPromise.setRpcResult(ykcDataProtocol.getBytes()); + // 唤醒外部线程 + log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); + syncPromise.wake(); + } + // 如果收到成功, 从redis取值, 保存到数据库 if ("01".equals(resultCode)) { // 预约成功, 删除redis中的预约信息 diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TimeCheckSettingResponseHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TimeCheckSettingResponseHandler.java index 5d0f3e495..a66599e95 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TimeCheckSettingResponseHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TimeCheckSettingResponseHandler.java @@ -50,7 +50,7 @@ public class TimeCheckSettingResponseHandler extends AbstractYkcHandler { length = 7; byte[] currentTimeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length); Date date = Cp56Time2aUtil.byte2Hdate(currentTimeByteArr); - log.info("对时设置应答, pileSn:{}, channelId:{}, 充电桩当前时间:{}", pileSn, channel.channel().id().asShortText(), DateUtils.formatDateTime(date)); + log.info("[===对时设置充电桩应答===], pileSn:{}, channelId:{}, 充电桩当前时间:{}", pileSn, channel.channel().id().asShortText(), DateUtils.formatDateTime(date)); return null; } } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java index 25ec0111a..61b2f3247 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java @@ -51,9 +51,14 @@ public class NettyServerManager implements CommandLineRunner { .handler(new LoggingHandler(LogLevel.DEBUG)) // .option(ChannelOption.SO_BACKLOG, 128) // 默认128 .option(ChannelOption.SO_BACKLOG, 1024) - .option(ChannelOption.SO_REUSEADDR, true) - .childOption(ChannelOption.SO_KEEPALIVE, true) + // .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) // 保持连接 + .childOption(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle 算法 + .childOption(ChannelOption.SO_RCVBUF, 64 * 1024) // 接收缓冲区 + .childOption(ChannelOption.SO_SNDBUF, 64 * 1024) // 发送缓冲区 + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024)) // 写缓冲水位 .childOption(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 启用池化内存分配器 .childHandler(nettyServerChannelInitializer) .localAddress(new InetSocketAddress(host, port)); diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java index 4db0989f3..a8ecf9fab 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java @@ -26,8 +26,8 @@ public class NettyServerChannelInitializer extends ChannelInitializer CHANNEL_MAP = new ConcurrentHashMap<>(); - private final List notPrintFrameTypeList = Lists.newArrayList("0x03"); + private final List notPrintFrameTypeList = Lists.newArrayList(); // "0x03" /** * 有客户端连接服务器会触发此函数 @@ -136,21 +134,21 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { String frameType = YKCUtils.frameType2Str(frameTypeBytes); // 判断该帧类型是否为某请求帧的应答帧 - String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType); + // String requestFrameType = YKCFrameTypeCode.PileAnswersRelation.getRequestFrameType(frameType); // log.info("同步获取响应数据-判断该帧类型是否为某请求帧的应答帧, frameType:{}, requestFrameType:{}", frameType, requestFrameType); - if (StringUtils.isNotBlank(requestFrameType)) { - // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 - String msgId = ctx.channel().id().toString() + "_" + requestFrameType; - // log.info("同步获取响应数据-收到消息, msgId:{}", msgId); - SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); - if(syncPromise != null) { - // 设置响应结果 - syncPromise.setRpcResult(ykcDataProtocol.getBytes()); - // 唤醒外部线程 - // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); - syncPromise.wake(); - } - } + // if (StringUtils.isNotBlank(requestFrameType)) { + // // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + // String msgId = ctx.channel().id().toString() + "_" + requestFrameType; + // // log.info("同步获取响应数据-收到消息, msgId:{}", msgId); + // SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msgId); + // if(syncPromise != null) { + // // 设置响应结果 + // syncPromise.setRpcResult(ykcDataProtocol.getBytes()); + // // 唤醒外部线程 + // // log.info("同步获取响应数据-唤醒外部线程, SyncPromise:{}", JSON.toJSONString(syncPromise)); + // syncPromise.wake(); + // } + // } // 获取序列号域 int serialNumber = BytesUtil.bytesToIntLittle(ykcDataProtocol.getSerialNumber()); diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java index 5d3a9a608..b3852c339 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java @@ -64,7 +64,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { /** * 发送消息,无返回值 */ - public void runSend(byte[] msg, String pileSn, Enum frameTypeCode) throws Exception { + public void runSend(byte[] msg, String pileSn, Enum frameTypeCode) { // 通过桩编号获取channel ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn); String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); // 帧类型名称 @@ -73,7 +73,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { throw new NullPointerException("channel"); } if(msg == null) { - throw new NullPointerException("msg"); + throw new NullPointerException("发送消息msg为空"); } /* @@ -157,7 +157,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { throw new NullPointerException("channel"); } if(msg == null) { - throw new NullPointerException("msg"); + throw new NullPointerException("发送消息msg为空"); } if(timeout <= 0) { throw new IllegalArgumentException("timeout must greater than 0"); @@ -285,7 +285,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { byte[] msgBody = Bytes.concat(orderIdByteArr, pileSnByteArr, connectorCodeByteArr, logicCardNumByteArr, physicsCardNumByteArr, accountBalanceByteArr); try { - this.supplySend(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE); + this.runSend(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -306,7 +306,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { // 远程停机 byte[] msgBody = Bytes.concat(BytesUtil.str2Bcd(pileSn), BytesUtil.str2Bcd(connectorCode)); try { - this.supplySend(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE); + this.runSend(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE); } catch (Exception e) { log.error("发送停止充电 error:", e); } @@ -319,7 +319,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { String connectorCode = command.getConnectorCode(); byte[] msg = BytesUtil.str2Bcd(pileSn + connectorCode); try { - this.supplySend(msg, pileSn, YKCFrameTypeCode.READ_REAL_TIME_MONITOR_DATA_CODE); + this.runSend(msg, pileSn, YKCFrameTypeCode.READ_REAL_TIME_MONITOR_DATA_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -332,7 +332,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { byte[] msg = BytesUtil.str2Bcd(pileSn + Constants.ZERO_ONE); log.info("【=====平台下发指令=====】:重启充电桩:,{}", pileSn); try { - this.supplySend(msg, pileSn, YKCFrameTypeCode.REMOTE_RESTART_CODE); + this.runSend(msg, pileSn, YKCFrameTypeCode.REMOTE_RESTART_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -377,7 +377,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { // push消息 try { - this.supplySend(msg, pileSn, YKCFrameTypeCode.REMOTE_ISSUE_QRCODE_CODE); + this.runSend(msg, pileSn, YKCFrameTypeCode.REMOTE_ISSUE_QRCODE_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -403,7 +403,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { byte[] msg = Bytes.concat(pileSnByteArr, dateBytes); try { - this.supplySend(msg, pileSn, YKCFrameTypeCode.TIME_CHECK_SETTING_CODE); + this.runSend(msg, pileSn, YKCFrameTypeCode.TIME_CHECK_SETTING_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -487,7 +487,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { updateServerPortByteArr, userNameByteArr, passwordByteArr, filePathByteArr, performTypeByteArr, overTimeByteArr); try { - this.supplySend(msgBody, pileModelInfoVO.getPileSn(), YKCFrameTypeCode.REMOTE_UPDATE_CODE); + this.runSend(msgBody, pileModelInfoVO.getPileSn(), YKCFrameTypeCode.REMOTE_UPDATE_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -521,7 +521,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { byte[] msg = Bytes.concat(pileSnByteArr, workingStateByteArr, maxPowerByteArr); try { - this.supplySend(msg, pileSn, YKCFrameTypeCode.CHARGING_PILE_WORKING_PARAMETER_SETTING_CODE); + this.runSend(msg, pileSn, YKCFrameTypeCode.CHARGING_PILE_WORKING_PARAMETER_SETTING_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -552,7 +552,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { byte[] msg = Bytes.concat(pileSnByteArr, pileType); try { - this.supplySend(msg, pileSn, YKCFrameTypeCode.QUERY_PILE_WORK_PARAMS_CODE); + this.runSend(msg, pileSn, YKCFrameTypeCode.QUERY_PILE_WORK_PARAMS_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -582,7 +582,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { byte[] msg = Bytes.concat(pileSnByteArr, connectorCodeByteArr, logicByteArr, priceByte); try { - this.supplySend(msg, pileSn, YKCFrameTypeCode.REMOTE_ACCOUNT_BALANCE_UPDATE_CODE); + this.runSend(msg, pileSn, YKCFrameTypeCode.REMOTE_ACCOUNT_BALANCE_UPDATE_CODE); } catch (Exception e) { throw new RuntimeException(e); } @@ -619,7 +619,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { byte[] msg = Bytes.concat(pileSnByteArr, connectorCodeByteArr, operateByteArr, obligateByteArr); try { - this.supplySend(msg, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_CODE); + this.runSend(msg, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_CODE); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/jsowell-thirdparty/src/main/java/com/jsowell/thirdparty/common/NotificationService.java b/jsowell-thirdparty/src/main/java/com/jsowell/thirdparty/common/NotificationService.java index 25e6d627d..262f531f6 100644 --- a/jsowell-thirdparty/src/main/java/com/jsowell/thirdparty/common/NotificationService.java +++ b/jsowell-thirdparty/src/main/java/com/jsowell/thirdparty/common/NotificationService.java @@ -58,7 +58,8 @@ public class NotificationService { String postResult = platformService.notificationStationInfo(stationId); result.append(postResult).append("\n"); } catch (Exception e) { - logger.error("充电站信息变化推送error", e); + logger.error("平台类型:{}, 平台名称:{}, 站点id:{}, 充电站信息变化推送error:{}", + secretInfoVO.getPlatformType(), secretInfoVO.getPlatformName(), stationId, e.getMessage()); } } return result.toString(); @@ -93,7 +94,8 @@ public class NotificationService { ThirdPartyPlatformService platformService = ThirdPartyPlatformFactory.getInvokeStrategy(secretInfoVO.getPlatformType()); platformService.notificationStationStatus(stationId, pileConnectorCode, status, secretInfoVO); } catch (Exception e) { - logger.error("设备状态变化推送error", e); + logger.error("平台类型:{}, 平台名称:{}, 站点id:{}, 枪口编号:{}, 设备状态变化推送error:{}", + secretInfoVO.getPlatformType(), secretInfoVO.getPlatformName(), stationId, pileConnectorCode, e.getMessage()); } } } @@ -125,9 +127,9 @@ public class NotificationService { ThirdPartyPlatformService platformService = ThirdPartyPlatformFactory.getInvokeStrategy(secretInfoVO.getPlatformType()); try { platformService.notificationConnectorChargeStatus(orderCode, secretInfoVO); - } catch (Exception e) { - logger.error("设备充电中状态变化推送error:", e); + logger.error("平台类型:{}, 平台名称:{}, 站点id:{}, 订单编号:{}, 设备充电中状态变化推送error:{}", + secretInfoVO.getPlatformType(), secretInfoVO.getPlatformName(), stationId, orderCode, e.getMessage()); } try { platformService.notificationEquipChargeStatus(orderCode);