diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java index c3c4014..20ff976 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java @@ -134,58 +134,100 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple PendingMsgHolder pendingMsgHolder = new PendingMsgHolder(); Future packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() -> orderedMsgList.forEach(element -> { + UUID id = element.getUuid(); + ProtoQueueMsg msg = element.getMsg(); + tracer(msg); - log.trace("[{}] Creating main callback for message: {}", id, msg.getValue()); + + log.trace("[{}] Creating PackCallback for message: {}", id, msg.getValue()); + Callback callback = new PackCallback<>(id, ctx); + try { UplinkQueueMessage uplinkQueueMsg = msg.getValue(); - pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg); - if (uplinkQueueMsg.hasLoginRequest()) { - pileProtocolService.pileLogin(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasHeartBeatRequest()) { - pileProtocolService.heartBeat(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasVerifyPricingRequest()) { - pileProtocolService.verifyPricing(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasQueryPricingRequest()) { - pileProtocolService.queryPricing(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasGunRunStatusProto()) { - pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasChargingProgressProto()) { - pileProtocolService.postChargingProgress(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasSetPricingResponse()) { - pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) { - pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { - pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasTransactionRecord()) { - pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback); - } else { - callback.onSuccess(); - } if (statsEnabled) { stats.log(uplinkQueueMsg); } + + pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg); + + if (uplinkQueueMsg.hasLoginRequest()) { + + pileProtocolService.pileLogin(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasHeartBeatRequest()) { + + pileProtocolService.heartBeat(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasVerifyPricingRequest()) { + + pileProtocolService.verifyPricing(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasQueryPricingRequest()) { + + pileProtocolService.queryPricing(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasGunRunStatusProto()) { + + pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasChargingProgressProto()) { + + pileProtocolService.postChargingProgress(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasSetPricingResponse()) { + + pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) { + + pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { + + pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasTransactionRecord()) { + + pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback); + + } else { + + callback.onSuccess(); + } + } catch (Throwable e) { + log.warn("[{}] Failed to process message: {}", id, msg, e); + callback.onFailure(e); } })) ); - if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + + if (!ctx.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + if (!packSubmitFuture.isDone()) { + packSubmitFuture.cancel(true); + UplinkQueueMessage lastSubmitMsg = pendingMsgHolder.getUplinkQueueMessage(); + log.warn("Timeout to process message: {}", lastSubmitMsg); } + if (log.isDebugEnabled()) { - ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); + + ctx.getAckMap().forEach((id, msg) -> log.info("[{}] Timeout to process message: {}", id, msg.getValue())); + } + ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue())); } + consumer.commit(); } @@ -198,11 +240,8 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple origin = ByteUtil.bytesToString(tracerOrigin); } - long ts = System.currentTimeMillis(); byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS); - if (tracerTs != null) { - ts = ByteUtil.bytesToLong(tracerTs); - } + long ts = tracerTs != null ? ByteUtil.bytesToLong(tracerTs) : System.currentTimeMillis(); return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts); })