根据压测结果调参

This commit is contained in:
三丙
2024-10-22 15:04:37 +08:00
parent 6bb3d3738f
commit 09281ca396
10 changed files with 34 additions and 41 deletions

View File

@@ -35,6 +35,9 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin
EXPOSE 8080 8080 EXPOSE 8080 8080
ENV APP_LOG_LEVEL=INFO
ENV PROTOCOLS_LOG_LEVEL=INFO
CMD ["start.sh"] CMD ["start.sh"]

View File

@@ -35,6 +35,8 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin
EXPOSE 8081 8081 EXPOSE 8081 8081
ENV PROTOCOLS_LOG_LEVEL=INFO
CMD ["start.sh"] CMD ["start.sh"]

View File

@@ -27,7 +27,7 @@ spring:
password: "${SPRING_DATASOURCE_PASSWORD:postgres}" password: "${SPRING_DATASOURCE_PASSWORD:postgres}"
hikari: hikari:
leak-detection-threshold: "${SPRING_DATASOURCE_HIKARI_LEAK_DETECTION_THRESHOLD:0}" leak-detection-threshold: "${SPRING_DATASOURCE_HIKARI_LEAK_DETECTION_THRESHOLD:0}"
maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:32}" maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:64}"
register-mbeans: "${SPRING_DATASOURCE_HIKARI_REGISTER_MBEANS:false}" register-mbeans: "${SPRING_DATASOURCE_HIKARI_REGISTER_MBEANS:false}"
mybatis-plus: mybatis-plus:
@@ -66,7 +66,7 @@ queue:
in-memory: in-memory:
queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}" queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}"
stats: stats:
print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:10000}"
kafka: kafka:
bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}" bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}"
ssl: ssl:
@@ -128,7 +128,7 @@ redis:
standalone: standalone:
host: "${REDIS_HOST:redis}" host: "${REDIS_HOST:redis}"
port: "${REDIS_PORT:6379}" port: "${REDIS_PORT:6379}"
useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:false}" useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:true}"
clientName: "${REDIS_CLIENT_NAME:standalone}" clientName: "${REDIS_CLIENT_NAME:standalone}"
commandTimeout: "${REDIS_CLIENT_COMMAND_TIMEOUT:30000}" commandTimeout: "${REDIS_CLIENT_COMMAND_TIMEOUT:30000}"
shutdownTimeout: "${REDIS_CLIENT_SHUTDOWN_TIMEOUT:1000}" shutdownTimeout: "${REDIS_CLIENT_SHUTDOWN_TIMEOUT:1000}"

View File

@@ -46,6 +46,12 @@
<AppenderRef ref="ROLLING_FILE"/> <AppenderRef ref="ROLLING_FILE"/>
</AsyncLogger> </AsyncLogger>
<AsyncLogger name="sanbing.jcpp.app" level="${env:APP_LOG_LEVEL:-TRACE}"
additivity="false" includeLocation="false">
<AppenderRef ref="CONSOLE"/>
<AppenderRef ref="ROLLING_FILE"/>
</AsyncLogger>
<AsyncLogger name="sanbing.jcpp.protocol" level="${env:PROTOCOLS_LOG_LEVEL:-TRACE}" <AsyncLogger name="sanbing.jcpp.protocol" level="${env:PROTOCOLS_LOG_LEVEL:-TRACE}"
additivity="false" includeLocation="false"> additivity="false" includeLocation="false">
<AppenderRef ref="CONSOLE"/> <AppenderRef ref="CONSOLE"/>

View File

@@ -35,16 +35,12 @@ import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage; import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
/** /**
@@ -232,23 +228,10 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
} }
private void tracer(ProtoQueueMsg<UplinkQueueMessage> msg) { private void tracer(ProtoQueueMsg<UplinkQueueMessage> msg) {
if (Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID))
.map(tracerId -> {
String origin = null;
byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN);
if (tracerOrigin != null) {
origin = ByteUtil.bytesToString(tracerOrigin);
}
byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS); TracerContextUtil.newTracer(ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ID)),
long ts = tracerTs != null ? ByteUtil.bytesToLong(tracerTs) : System.currentTimeMillis(); ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ORIGIN)),
ByteUtil.bytesToLong(msg.getHeaders().get(MSG_MD_TRACER_TS)));
return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts);
})
.isEmpty()) {
TracerContextUtil.newTracer();
}
MDCUtils.recordTracer(); MDCUtils.recordTracer();
} }

View File

@@ -4,6 +4,8 @@
*/ */
package sanbing.jcpp.infrastructure.queue.common; package sanbing.jcpp.infrastructure.queue.common;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*;
/** /**
* @author baigod * @author baigod
*/ */
@@ -11,5 +13,10 @@ public final class QueueConstants {
public static final String MSG_MD_PREFIX = "jcpp_"; public static final String MSG_MD_PREFIX = "jcpp_";
public static final String MSG_MD_TS = "ts"; public static final String MSG_MD_TRACER_ID = MSG_MD_PREFIX + JCPP_TRACER_ID;
public static final String MSG_MD_TRACER_ORIGIN = MSG_MD_PREFIX + JCPP_TRACER_ORIGIN;
public static final String MSG_MD_TRACER_TS = MSG_MD_PREFIX + JCPP_TRACER_TS;
} }

View File

@@ -34,7 +34,6 @@ public class MDCUtils {
} }
return tracer.getTraceId(); return tracer.getTraceId();
} }
public static void cleanTracer() { public static void cleanTracer() {

View File

@@ -40,7 +40,6 @@ public class TracerContextUtil {
tracer = new Tracer(traceId, origin, ts); tracer = new Tracer(traceId, origin, ts);
} }
TRACE_ID_CONTAINER.set(tracer); TRACE_ID_CONTAINER.set(tracer);
return tracer; return tracer;

View File

@@ -25,10 +25,7 @@ import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
/** /**
* @author baigod * @author baigod
@@ -70,9 +67,9 @@ public abstract class Forwarder {
QueueMsgHeaders headers = new DefaultQueueMsgHeaders(); QueueMsgHeaders headers = new DefaultQueueMsgHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer(); Tracer currentTracer = TracerContextUtil.getCurrentTracer();
headers.put(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())); headers.put(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()));
headers.put(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())); headers.put(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()));
headers.put(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())); headers.put(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()));
TopicPartitionInfo tpi = partitionProvider.resolve(ServiceType.APP, topic, key); TopicPartitionInfo tpi = partitionProvider.resolve(ServiceType.APP, topic, key);
producer.send(tpi, new ProtoQueueMsg<>(key, msg, headers), new QueueCallback() { producer.send(tpi, new ProtoQueueMsg<>(key, msg, headers), new QueueCallback() {

View File

@@ -36,10 +36,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
/** /**
* @author baigod * @author baigod
@@ -154,9 +151,9 @@ public class KafkaForwarder extends Forwarder {
Headers headers = new RecordHeaders(); Headers headers = new RecordHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer(); Tracer currentTracer = TracerContextUtil.getCurrentTracer();
headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()))); headers.add(new RecordHeader(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())));
headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()))); headers.add(new RecordHeader(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())));
headers.add(new RecordHeader(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()))); headers.add(new RecordHeader(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())));
if (kafkaCfg.getEncoder() == KafkaCfg.EncoderType.json) { if (kafkaCfg.getEncoder() == KafkaCfg.EncoderType.json) {