优化逻辑

This commit is contained in:
三丙
2025-06-25 23:32:32 +08:00
parent 47e1b5d9d0
commit 412ae4d6d4

View File

@@ -180,29 +180,22 @@ public class KafkaForwarder extends Forwarder {
log.debug("Kafka 消息转发完成, success:{}", e == null);
if (consumer != null) {
onComplete(metadata, e, consumer);
}
}
private void onComplete(RecordMetadata metadata, Exception e, BiConsumer<Boolean, ObjectNode> consumer) {
if (consumer == null) {
return;
}
ObjectNode objectNode = JacksonUtil.newObjectNode();
objectNode.put(OFFSET, String.valueOf(metadata.offset()));
objectNode.put(PARTITION, String.valueOf(metadata.partition()));
objectNode.put(TOPIC, metadata.topic());
if (e != null) {
objectNode.put(ERROR, e.getClass() + ": " + e.getMessage());
forwarderMessagesStats.incrementFailed();
} else {
forwarderMessagesStats.incrementSuccessful();
}
consumer.accept(e == null, objectNode);
if (consumer != null) {
ObjectNode objectNode = JacksonUtil.newObjectNode();
if (e != null) {
objectNode.put(ERROR, e.getClass() + ": " + e.getMessage());
}
objectNode.put(OFFSET, String.valueOf(metadata.offset()));
objectNode.put(PARTITION, String.valueOf(metadata.partition()));
objectNode.put(TOPIC, metadata.topic());
consumer.accept(e == null, objectNode);
}
}
}