From 45da17b220bbd524a5a95c7ddcb3f46a31f94d58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 22 Oct 2024 11:16:12 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BD=AC=E5=8F=91=E6=B6=88=E6=81=AF=E9=87=8F?= =?UTF-8?q?=E6=8C=87=E6=A0=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/sanbing/jcpp/protocol/forwarder/Forwarder.java | 7 ++++++- .../sanbing/jcpp/protocol/forwarder/KafkaForwarder.java | 5 +++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java index 7cd2c22..9aa2e9b 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java @@ -51,7 +51,7 @@ public abstract class Forwarder { protected final boolean isMonolith; protected QueueProducer> producer; - public Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) { + protected Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) { this.protocolName = protocolName; this.partitionProvider = partitionProvider; this.serviceInfoProvider = serviceInfoProvider; @@ -66,6 +66,7 @@ public abstract class Forwarder { public abstract void destroy(); protected void jcppForward(String topic, String key, UplinkQueueMessage msg, BiConsumer consumer) { + forwarderMessagesStats.incrementTotal(); QueueMsgHeaders headers = new DefaultQueueMsgHeaders(); Tracer currentTracer = TracerContextUtil.getCurrentTracer(); @@ -80,11 +81,13 @@ public abstract class Forwarder { TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs()); MDCUtils.recordTracer(); + log.trace("单体消息转发成功 key:{}", key); if (consumer != null) { consumer.accept(true, JacksonUtil.newObjectNode()); } + forwarderMessagesStats.incrementSuccessful(); } @Override @@ -92,6 +95,7 @@ public abstract class Forwarder { TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs()); MDCUtils.recordTracer(); + log.warn("单体消息转发异常", t); if (consumer != null) { @@ -99,6 +103,7 @@ public abstract class Forwarder { objectNode.put(ERROR, t.getClass() + ": " + t.getMessage()); consumer.accept(true, objectNode); } + forwarderMessagesStats.incrementFailed(); } }); } diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java index ae1ebf3..c81b929 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java @@ -150,6 +150,7 @@ public class KafkaForwarder extends Forwarder { } private void kafkaForward(String topic, String key, UplinkQueueMessage msg, BiConsumer consumer) throws InvalidProtocolBufferException { + forwarderMessagesStats.incrementTotal(); Headers headers = new RecordHeaders(); Tracer currentTracer = TracerContextUtil.getCurrentTracer(); @@ -177,6 +178,7 @@ public class KafkaForwarder extends Forwarder { private void logAndDoConsumer(BiConsumer consumer, RecordMetadata metadata, Exception e, Tracer currentTracer) { TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs()); MDCUtils.recordTracer(); + log.debug("Kafka 消息转发完成, success:{}", e == null); if (consumer != null) { @@ -196,6 +198,9 @@ public class KafkaForwarder extends Forwarder { if (e != null) { objectNode.put(ERROR, e.getClass() + ": " + e.getMessage()); + forwarderMessagesStats.incrementFailed(); + } else { + forwarderMessagesStats.incrementSuccessful(); } consumer.accept(e == null, objectNode);