mirror of
https://gitee.com/san-bing/JChargePointProtocol
synced 2026-05-04 18:09:54 +08:00
指标修正
This commit is contained in:
@@ -134,58 +134,100 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
|
||||
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
|
||||
Future<?> packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() ->
|
||||
orderedMsgList.forEach(element -> {
|
||||
|
||||
UUID id = element.getUuid();
|
||||
|
||||
ProtoQueueMsg<UplinkQueueMessage> msg = element.getMsg();
|
||||
|
||||
tracer(msg);
|
||||
log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
|
||||
|
||||
log.trace("[{}] Creating PackCallback for message: {}", id, msg.getValue());
|
||||
|
||||
Callback callback = new PackCallback<>(id, ctx);
|
||||
|
||||
try {
|
||||
UplinkQueueMessage uplinkQueueMsg = msg.getValue();
|
||||
pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg);
|
||||
if (uplinkQueueMsg.hasLoginRequest()) {
|
||||
pileProtocolService.pileLogin(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasHeartBeatRequest()) {
|
||||
pileProtocolService.heartBeat(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasVerifyPricingRequest()) {
|
||||
pileProtocolService.verifyPricing(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasQueryPricingRequest()) {
|
||||
pileProtocolService.queryPricing(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasGunRunStatusProto()) {
|
||||
pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasChargingProgressProto()) {
|
||||
pileProtocolService.postChargingProgress(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasSetPricingResponse()) {
|
||||
pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) {
|
||||
pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) {
|
||||
pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback);
|
||||
} else if (uplinkQueueMsg.hasTransactionRecord()) {
|
||||
pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback);
|
||||
} else {
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
if (statsEnabled) {
|
||||
stats.log(uplinkQueueMsg);
|
||||
}
|
||||
|
||||
pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg);
|
||||
|
||||
if (uplinkQueueMsg.hasLoginRequest()) {
|
||||
|
||||
pileProtocolService.pileLogin(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasHeartBeatRequest()) {
|
||||
|
||||
pileProtocolService.heartBeat(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasVerifyPricingRequest()) {
|
||||
|
||||
pileProtocolService.verifyPricing(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasQueryPricingRequest()) {
|
||||
|
||||
pileProtocolService.queryPricing(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasGunRunStatusProto()) {
|
||||
|
||||
pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasChargingProgressProto()) {
|
||||
|
||||
pileProtocolService.postChargingProgress(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasSetPricingResponse()) {
|
||||
|
||||
pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) {
|
||||
|
||||
pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) {
|
||||
|
||||
pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback);
|
||||
|
||||
} else if (uplinkQueueMsg.hasTransactionRecord()) {
|
||||
|
||||
pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback);
|
||||
|
||||
} else {
|
||||
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
} catch (Throwable e) {
|
||||
|
||||
log.warn("[{}] Failed to process message: {}", id, msg, e);
|
||||
|
||||
callback.onFailure(e);
|
||||
}
|
||||
}))
|
||||
);
|
||||
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
|
||||
|
||||
if (!ctx.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
|
||||
|
||||
if (!packSubmitFuture.isDone()) {
|
||||
|
||||
packSubmitFuture.cancel(true);
|
||||
|
||||
UplinkQueueMessage lastSubmitMsg = pendingMsgHolder.getUplinkQueueMessage();
|
||||
|
||||
log.warn("Timeout to process message: {}", lastSubmitMsg);
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
|
||||
|
||||
ctx.getAckMap().forEach((id, msg) -> log.info("[{}] Timeout to process message: {}", id, msg.getValue()));
|
||||
|
||||
}
|
||||
|
||||
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
|
||||
}
|
||||
|
||||
consumer.commit();
|
||||
}
|
||||
|
||||
@@ -198,11 +240,8 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
|
||||
origin = ByteUtil.bytesToString(tracerOrigin);
|
||||
}
|
||||
|
||||
long ts = System.currentTimeMillis();
|
||||
byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS);
|
||||
if (tracerTs != null) {
|
||||
ts = ByteUtil.bytesToLong(tracerTs);
|
||||
}
|
||||
long ts = tracerTs != null ? ByteUtil.bytesToLong(tracerTs) : System.currentTimeMillis();
|
||||
|
||||
return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts);
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user