diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java index b1bddd1..2dcdd30 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java @@ -180,29 +180,22 @@ public class KafkaForwarder extends Forwarder { log.debug("Kafka 消息转发完成, success:{}", e == null); - if (consumer != null) { - onComplete(metadata, e, consumer); - } - } - - private void onComplete(RecordMetadata metadata, Exception e, BiConsumer consumer) { - if (consumer == null) { - return; - } - - ObjectNode objectNode = JacksonUtil.newObjectNode(); - objectNode.put(OFFSET, String.valueOf(metadata.offset())); - objectNode.put(PARTITION, String.valueOf(metadata.partition())); - objectNode.put(TOPIC, metadata.topic()); - if (e != null) { - objectNode.put(ERROR, e.getClass() + ": " + e.getMessage()); forwarderMessagesStats.incrementFailed(); } else { forwarderMessagesStats.incrementSuccessful(); } - consumer.accept(e == null, objectNode); + if (consumer != null) { + ObjectNode objectNode = JacksonUtil.newObjectNode(); + if (e != null) { + objectNode.put(ERROR, e.getClass() + ": " + e.getMessage()); + } + objectNode.put(OFFSET, String.valueOf(metadata.offset())); + objectNode.put(PARTITION, String.valueOf(metadata.partition())); + objectNode.put(TOPIC, metadata.topic()); + consumer.accept(e == null, objectNode); + } } } \ No newline at end of file