转发消息量指标

This commit is contained in:
三丙
2024-10-22 11:16:12 +08:00
parent fb0834ce1d
commit 45da17b220
2 changed files with 11 additions and 1 deletions

View File

@@ -51,7 +51,7 @@ public abstract class Forwarder {
protected final boolean isMonolith;
protected QueueProducer<ProtoQueueMsg<UplinkQueueMessage>> producer;
public Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) {
protected Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) {
this.protocolName = protocolName;
this.partitionProvider = partitionProvider;
this.serviceInfoProvider = serviceInfoProvider;
@@ -66,6 +66,7 @@ public abstract class Forwarder {
public abstract void destroy();
protected void jcppForward(String topic, String key, UplinkQueueMessage msg, BiConsumer<Boolean, ObjectNode> consumer) {
forwarderMessagesStats.incrementTotal();
QueueMsgHeaders headers = new DefaultQueueMsgHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
@@ -80,11 +81,13 @@ public abstract class Forwarder {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
log.trace("单体消息转发成功 key:{}", key);
if (consumer != null) {
consumer.accept(true, JacksonUtil.newObjectNode());
}
forwarderMessagesStats.incrementSuccessful();
}
@Override
@@ -92,6 +95,7 @@ public abstract class Forwarder {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
log.warn("单体消息转发异常", t);
if (consumer != null) {
@@ -99,6 +103,7 @@ public abstract class Forwarder {
objectNode.put(ERROR, t.getClass() + ": " + t.getMessage());
consumer.accept(true, objectNode);
}
forwarderMessagesStats.incrementFailed();
}
});
}

View File

@@ -150,6 +150,7 @@ public class KafkaForwarder extends Forwarder {
}
private void kafkaForward(String topic, String key, UplinkQueueMessage msg, BiConsumer<Boolean, ObjectNode> consumer) throws InvalidProtocolBufferException {
forwarderMessagesStats.incrementTotal();
Headers headers = new RecordHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
@@ -177,6 +178,7 @@ public class KafkaForwarder extends Forwarder {
private void logAndDoConsumer(BiConsumer<Boolean, ObjectNode> consumer, RecordMetadata metadata, Exception e, Tracer currentTracer) {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
log.debug("Kafka 消息转发完成, success:{}", e == null);
if (consumer != null) {
@@ -196,6 +198,9 @@ public class KafkaForwarder extends Forwarder {
if (e != null) {
objectNode.put(ERROR, e.getClass() + ": " + e.getMessage());
forwarderMessagesStats.incrementFailed();
} else {
forwarderMessagesStats.incrementSuccessful();
}
consumer.accept(e == null, objectNode);