From 09281ca3960032a77495c456aa9f3a9602ed24c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 22 Oct 2024 15:04:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=B9=E6=8D=AE=E5=8E=8B=E6=B5=8B=E7=BB=93?= =?UTF-8?q?=E6=9E=9C=E8=B0=83=E5=8F=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker/Dockerfile-App | 3 +++ docker/Dockerfile-Protocol | 2 ++ .../src/main/resources/app-service.yml | 6 ++--- .../src/main/resources/log4j2.xml | 6 +++++ .../ProtocolUplinkConsumerService.java | 25 +++---------------- .../queue/common/QueueConstants.java | 9 ++++++- .../infrastructure/util/mdc/MDCUtils.java | 1 - .../util/trace/TracerContextUtil.java | 1 - .../jcpp/protocol/forwarder/Forwarder.java | 11 +++----- .../protocol/forwarder/KafkaForwarder.java | 11 +++----- 10 files changed, 34 insertions(+), 41 deletions(-) diff --git a/docker/Dockerfile-App b/docker/Dockerfile-App index 144da60..f619b3d 100644 --- a/docker/Dockerfile-App +++ b/docker/Dockerfile-App @@ -35,6 +35,9 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin EXPOSE 8080 8080 +ENV APP_LOG_LEVEL=INFO +ENV PROTOCOLS_LOG_LEVEL=INFO + CMD ["start.sh"] diff --git a/docker/Dockerfile-Protocol b/docker/Dockerfile-Protocol index 7aa342a..82aae02 100644 --- a/docker/Dockerfile-Protocol +++ b/docker/Dockerfile-Protocol @@ -35,6 +35,8 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin EXPOSE 8081 8081 +ENV PROTOCOLS_LOG_LEVEL=INFO + CMD ["start.sh"] diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 7662d41..11b7c38 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -27,7 +27,7 @@ spring: password: "${SPRING_DATASOURCE_PASSWORD:postgres}" hikari: leak-detection-threshold: "${SPRING_DATASOURCE_HIKARI_LEAK_DETECTION_THRESHOLD:0}" - maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:32}" + maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:64}" register-mbeans: "${SPRING_DATASOURCE_HIKARI_REGISTER_MBEANS:false}" mybatis-plus: @@ -66,7 +66,7 @@ queue: in-memory: queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}" stats: - print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" + print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:10000}" kafka: bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}" ssl: @@ -128,7 +128,7 @@ redis: standalone: host: "${REDIS_HOST:redis}" port: "${REDIS_PORT:6379}" - useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:false}" + useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:true}" clientName: "${REDIS_CLIENT_NAME:standalone}" commandTimeout: "${REDIS_CLIENT_COMMAND_TIMEOUT:30000}" shutdownTimeout: "${REDIS_CLIENT_SHUTDOWN_TIMEOUT:1000}" diff --git a/jcpp-app-bootstrap/src/main/resources/log4j2.xml b/jcpp-app-bootstrap/src/main/resources/log4j2.xml index f53a4fd..d5884d8 100644 --- a/jcpp-app-bootstrap/src/main/resources/log4j2.xml +++ b/jcpp-app-bootstrap/src/main/resources/log4j2.xml @@ -46,6 +46,12 @@ + + + + + diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java index 20ff976..0316d58 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java @@ -35,16 +35,12 @@ import sanbing.jcpp.infrastructure.util.trace.TracerRunnable; import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.*; import java.util.stream.Collectors; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN; +import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*; /** @@ -232,23 +228,10 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple } private void tracer(ProtoQueueMsg msg) { - if (Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID)) - .map(tracerId -> { - String origin = null; - byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN); - if (tracerOrigin != null) { - origin = ByteUtil.bytesToString(tracerOrigin); - } - byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS); - long ts = tracerTs != null ? ByteUtil.bytesToLong(tracerTs) : System.currentTimeMillis(); - - return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts); - }) - .isEmpty()) { - - TracerContextUtil.newTracer(); - } + TracerContextUtil.newTracer(ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ID)), + ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ORIGIN)), + ByteUtil.bytesToLong(msg.getHeaders().get(MSG_MD_TRACER_TS))); MDCUtils.recordTracer(); } diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java index 509bc52..6017c93 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java @@ -4,6 +4,8 @@ */ package sanbing.jcpp.infrastructure.queue.common; +import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*; + /** * @author baigod */ @@ -11,5 +13,10 @@ public final class QueueConstants { public static final String MSG_MD_PREFIX = "jcpp_"; - public static final String MSG_MD_TS = "ts"; + public static final String MSG_MD_TRACER_ID = MSG_MD_PREFIX + JCPP_TRACER_ID; + + public static final String MSG_MD_TRACER_ORIGIN = MSG_MD_PREFIX + JCPP_TRACER_ORIGIN; + + public static final String MSG_MD_TRACER_TS = MSG_MD_PREFIX + JCPP_TRACER_TS; + } \ No newline at end of file diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java index 7f22912..2af32f0 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java @@ -34,7 +34,6 @@ public class MDCUtils { } return tracer.getTraceId(); - } public static void cleanTracer() { diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java index 5921c7c..589fc13 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java @@ -40,7 +40,6 @@ public class TracerContextUtil { tracer = new Tracer(traceId, origin, ts); } - TRACE_ID_CONTAINER.set(tracer); return tracer; diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java index 9aa2e9b..b4d7fe8 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java @@ -25,10 +25,7 @@ import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN; +import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*; /** * @author baigod @@ -70,9 +67,9 @@ public abstract class Forwarder { QueueMsgHeaders headers = new DefaultQueueMsgHeaders(); Tracer currentTracer = TracerContextUtil.getCurrentTracer(); - headers.put(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())); - headers.put(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())); - headers.put(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())); + headers.put(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())); + headers.put(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())); + headers.put(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())); TopicPartitionInfo tpi = partitionProvider.resolve(ServiceType.APP, topic, key); producer.send(tpi, new ProtoQueueMsg<>(key, msg, headers), new QueueCallback() { diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java index c81b929..56d97a4 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java @@ -36,10 +36,7 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN; +import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*; /** * @author baigod @@ -154,9 +151,9 @@ public class KafkaForwarder extends Forwarder { Headers headers = new RecordHeaders(); Tracer currentTracer = TracerContextUtil.getCurrentTracer(); - headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()))); - headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()))); - headers.add(new RecordHeader(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()))); + headers.add(new RecordHeader(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()))); + headers.add(new RecordHeader(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()))); + headers.add(new RecordHeader(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()))); if (kafkaCfg.getEncoder() == KafkaCfg.EncoderType.json) {