diff --git a/docker/Dockerfile-App b/docker/Dockerfile-App
index 144da60..f619b3d 100644
--- a/docker/Dockerfile-App
+++ b/docker/Dockerfile-App
@@ -35,6 +35,9 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin
EXPOSE 8080 8080
+ENV APP_LOG_LEVEL=INFO
+ENV PROTOCOLS_LOG_LEVEL=INFO
+
CMD ["start.sh"]
diff --git a/docker/Dockerfile-Protocol b/docker/Dockerfile-Protocol
index 7aa342a..82aae02 100644
--- a/docker/Dockerfile-Protocol
+++ b/docker/Dockerfile-Protocol
@@ -35,6 +35,8 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin
EXPOSE 8081 8081
+ENV PROTOCOLS_LOG_LEVEL=INFO
+
CMD ["start.sh"]
diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml
index 7662d41..11b7c38 100644
--- a/jcpp-app-bootstrap/src/main/resources/app-service.yml
+++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml
@@ -27,7 +27,7 @@ spring:
password: "${SPRING_DATASOURCE_PASSWORD:postgres}"
hikari:
leak-detection-threshold: "${SPRING_DATASOURCE_HIKARI_LEAK_DETECTION_THRESHOLD:0}"
- maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:32}"
+ maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:64}"
register-mbeans: "${SPRING_DATASOURCE_HIKARI_REGISTER_MBEANS:false}"
mybatis-plus:
@@ -66,7 +66,7 @@ queue:
in-memory:
queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}"
stats:
- print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
+ print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:10000}"
kafka:
bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}"
ssl:
@@ -128,7 +128,7 @@ redis:
standalone:
host: "${REDIS_HOST:redis}"
port: "${REDIS_PORT:6379}"
- useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:false}"
+ useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:true}"
clientName: "${REDIS_CLIENT_NAME:standalone}"
commandTimeout: "${REDIS_CLIENT_COMMAND_TIMEOUT:30000}"
shutdownTimeout: "${REDIS_CLIENT_SHUTDOWN_TIMEOUT:1000}"
diff --git a/jcpp-app-bootstrap/src/main/resources/log4j2.xml b/jcpp-app-bootstrap/src/main/resources/log4j2.xml
index f53a4fd..d5884d8 100644
--- a/jcpp-app-bootstrap/src/main/resources/log4j2.xml
+++ b/jcpp-app-bootstrap/src/main/resources/log4j2.xml
@@ -46,6 +46,12 @@
+
+
+
+
+
diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java
index 20ff976..0316d58 100644
--- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java
+++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java
@@ -35,16 +35,12 @@ import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.stream.Collectors;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
+import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
/**
@@ -232,23 +228,10 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
}
private void tracer(ProtoQueueMsg msg) {
- if (Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID))
- .map(tracerId -> {
- String origin = null;
- byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN);
- if (tracerOrigin != null) {
- origin = ByteUtil.bytesToString(tracerOrigin);
- }
- byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS);
- long ts = tracerTs != null ? ByteUtil.bytesToLong(tracerTs) : System.currentTimeMillis();
-
- return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts);
- })
- .isEmpty()) {
-
- TracerContextUtil.newTracer();
- }
+ TracerContextUtil.newTracer(ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ID)),
+ ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ORIGIN)),
+ ByteUtil.bytesToLong(msg.getHeaders().get(MSG_MD_TRACER_TS)));
MDCUtils.recordTracer();
}
diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java
index 509bc52..6017c93 100644
--- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java
+++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java
@@ -4,6 +4,8 @@
*/
package sanbing.jcpp.infrastructure.queue.common;
+import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*;
+
/**
* @author baigod
*/
@@ -11,5 +13,10 @@ public final class QueueConstants {
public static final String MSG_MD_PREFIX = "jcpp_";
- public static final String MSG_MD_TS = "ts";
+ public static final String MSG_MD_TRACER_ID = MSG_MD_PREFIX + JCPP_TRACER_ID;
+
+ public static final String MSG_MD_TRACER_ORIGIN = MSG_MD_PREFIX + JCPP_TRACER_ORIGIN;
+
+ public static final String MSG_MD_TRACER_TS = MSG_MD_PREFIX + JCPP_TRACER_TS;
+
}
\ No newline at end of file
diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java
index 7f22912..2af32f0 100644
--- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java
+++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java
@@ -34,7 +34,6 @@ public class MDCUtils {
}
return tracer.getTraceId();
-
}
public static void cleanTracer() {
diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java
index 5921c7c..589fc13 100644
--- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java
+++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java
@@ -40,7 +40,6 @@ public class TracerContextUtil {
tracer = new Tracer(traceId, origin, ts);
}
-
TRACE_ID_CONTAINER.set(tracer);
return tracer;
diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java
index 9aa2e9b..b4d7fe8 100644
--- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java
+++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java
@@ -25,10 +25,7 @@ import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
+import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
/**
* @author baigod
@@ -70,9 +67,9 @@ public abstract class Forwarder {
QueueMsgHeaders headers = new DefaultQueueMsgHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
- headers.put(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()));
- headers.put(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()));
- headers.put(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()));
+ headers.put(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()));
+ headers.put(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()));
+ headers.put(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()));
TopicPartitionInfo tpi = partitionProvider.resolve(ServiceType.APP, topic, key);
producer.send(tpi, new ProtoQueueMsg<>(key, msg, headers), new QueueCallback() {
diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java
index c81b929..56d97a4 100644
--- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java
+++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java
@@ -36,10 +36,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
+import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
/**
* @author baigod
@@ -154,9 +151,9 @@ public class KafkaForwarder extends Forwarder {
Headers headers = new RecordHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
- headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())));
- headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())));
- headers.add(new RecordHeader(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())));
+ headers.add(new RecordHeader(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())));
+ headers.add(new RecordHeader(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())));
+ headers.add(new RecordHeader(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())));
if (kafkaCfg.getEncoder() == KafkaCfg.EncoderType.json) {