diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index c5f1cc4..9bb5108 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -93,7 +93,7 @@ queue: auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}" topic-properties: - app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" consumer-stats: enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}" diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java index e7ab6d0..c3c4014 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java @@ -101,6 +101,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple } + @Override @PreDestroy public void destroy() { super.destroy(); @@ -132,7 +133,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); PendingMsgHolder pendingMsgHolder = new PendingMsgHolder(); Future packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() -> - orderedMsgList.forEach((element) -> { + orderedMsgList.forEach(element -> { UUID id = element.getUuid(); ProtoQueueMsg msg = element.getMsg(); tracer(msg); @@ -157,11 +158,11 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback); } else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) { pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { + } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback); - } else if(uplinkQueueMsg.hasTransactionRecord()){ + } else if (uplinkQueueMsg.hasTransactionRecord()) { pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback); - }else { + } else { callback.onSuccess(); } @@ -189,7 +190,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple } private void tracer(ProtoQueueMsg msg) { - Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID)) + if (Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID)) .map(tracerId -> { String origin = null; byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN); @@ -205,7 +206,10 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts); }) - .orElseGet(TracerContextUtil::newTracer); + .isEmpty()) { + + TracerContextUtil.newTracer(); + } MDCUtils.recordTracer(); } @@ -231,6 +235,6 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple @Setter @Getter private static class PendingMsgHolder { - private volatile UplinkQueueMessage uplinkQueueMessage; + private UplinkQueueMessage uplinkQueueMessage; } } \ No newline at end of file diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index 2b57f32..f417977 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -128,7 +128,7 @@ queue: auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}" topic-properties: - app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" consumer-stats: enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"