From 412ae4d6d41344323bc2954110ffa32cf58a2557 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 25 Jun 2025 23:32:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protocol/forwarder/KafkaForwarder.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java index b1bddd1..2dcdd30 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java @@ -180,29 +180,22 @@ public class KafkaForwarder extends Forwarder { log.debug("Kafka 消息转发完成, success:{}", e == null); - if (consumer != null) { - onComplete(metadata, e, consumer); - } - } - - private void onComplete(RecordMetadata metadata, Exception e, BiConsumer consumer) { - if (consumer == null) { - return; - } - - ObjectNode objectNode = JacksonUtil.newObjectNode(); - objectNode.put(OFFSET, String.valueOf(metadata.offset())); - objectNode.put(PARTITION, String.valueOf(metadata.partition())); - objectNode.put(TOPIC, metadata.topic()); - if (e != null) { - objectNode.put(ERROR, e.getClass() + ": " + e.getMessage()); forwarderMessagesStats.incrementFailed(); } else { forwarderMessagesStats.incrementSuccessful(); } - consumer.accept(e == null, objectNode); + if (consumer != null) { + ObjectNode objectNode = JacksonUtil.newObjectNode(); + if (e != null) { + objectNode.put(ERROR, e.getClass() + ": " + e.getMessage()); + } + objectNode.put(OFFSET, String.valueOf(metadata.offset())); + objectNode.put(PARTITION, String.valueOf(metadata.partition())); + objectNode.put(TOPIC, metadata.topic()); + consumer.accept(e == null, objectNode); + } } } \ No newline at end of file