修改kafka topic参数

This commit is contained in:
三丙
2024-10-16 09:37:36 +08:00
parent 2780298b60
commit 72baea61e5
3 changed files with 13 additions and 9 deletions

View File

@@ -93,7 +93,7 @@ queue:
auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}"
other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}" other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}"
topic-properties: topic-properties:
app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats: consumer-stats:
enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}" print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"

View File

@@ -101,6 +101,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
} }
@Override
@PreDestroy @PreDestroy
public void destroy() { public void destroy() {
super.destroy(); super.destroy();
@@ -132,7 +133,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder(); PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
Future<?> packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() -> Future<?> packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() ->
orderedMsgList.forEach((element) -> { orderedMsgList.forEach(element -> {
UUID id = element.getUuid(); UUID id = element.getUuid();
ProtoQueueMsg<UplinkQueueMessage> msg = element.getMsg(); ProtoQueueMsg<UplinkQueueMessage> msg = element.getMsg();
tracer(msg); tracer(msg);
@@ -159,9 +160,9 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback); pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) {
pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback); pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback);
} else if(uplinkQueueMsg.hasTransactionRecord()){ } else if (uplinkQueueMsg.hasTransactionRecord()) {
pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback); pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback);
}else { } else {
callback.onSuccess(); callback.onSuccess();
} }
@@ -189,7 +190,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
} }
private void tracer(ProtoQueueMsg<UplinkQueueMessage> msg) { private void tracer(ProtoQueueMsg<UplinkQueueMessage> msg) {
Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID)) if (Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID))
.map(tracerId -> { .map(tracerId -> {
String origin = null; String origin = null;
byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN); byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN);
@@ -205,7 +206,10 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts); return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts);
}) })
.orElseGet(TracerContextUtil::newTracer); .isEmpty()) {
TracerContextUtil.newTracer();
}
MDCUtils.recordTracer(); MDCUtils.recordTracer();
} }
@@ -231,6 +235,6 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
@Setter @Setter
@Getter @Getter
private static class PendingMsgHolder { private static class PendingMsgHolder {
private volatile UplinkQueueMessage uplinkQueueMessage; private UplinkQueueMessage uplinkQueueMessage;
} }
} }

View File

@@ -128,7 +128,7 @@ queue:
auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}"
other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}" other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}"
topic-properties: topic-properties:
app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats: consumer-stats:
enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}" print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"