From 73bc580a0a281372caa76ab8a6f385ccad80694b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 16:15:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E9=98=9F=E5=88=97=E5=AE=B9?= =?UTF-8?q?=E9=87=8F=E5=8F=AF=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-app-bootstrap/src/main/resources/app-service.yml | 3 ++- .../infrastructure/queue/memory/DefaultInMemoryStorage.java | 6 +++++- .../queue/provider/InMemoryAppQueueFactory.java | 2 +- .../src/main/resources/protocol-service.yml | 3 --- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 7fbfb91..7662d41 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -63,7 +63,8 @@ queue: type: "${QUEUE_TYPE:memory}" partitions: hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 - in_memory: + in-memory: + queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}" stats: print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" kafka: diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java index 347c88a..fd06cff 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java @@ -5,6 +5,7 @@ package sanbing.jcpp.infrastructure.queue.memory; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import sanbing.jcpp.infrastructure.queue.QueueMsg; @@ -18,6 +19,9 @@ import java.util.concurrent.LinkedBlockingQueue; public final class DefaultInMemoryStorage implements InMemoryStorage { private final ConcurrentHashMap> storage = new ConcurrentHashMap<>(); + @Value("${queue.in-memory.queue-capacity:100000}") + private int queueCapacity; + @Override public void printStats() { storage.forEach((topic, queue) -> { @@ -39,7 +43,7 @@ public final class DefaultInMemoryStorage implements InMemoryStorage { @Override public boolean put(String topic, QueueMsg msg) { - return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(10000)).add(msg); + return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(queueCapacity)).add(msg); } @Override diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java index 82cb1ad..21719f9 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java @@ -40,7 +40,7 @@ public class InMemoryAppQueueFactory implements AppQueueFactory { return new InMemoryQueueProducer<>(storage, topic); } - @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}") + @Scheduled(fixedRateString = "${queue.in-memory.stats.print-interval-ms:60000}") private void printInMemoryStats() { storage.printStats(); } diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index f417977..7fd56f0 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -98,9 +98,6 @@ queue: type: "${QUEUE_TYPE:kafka}" partitions: hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 - in_memory: - stats: - print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" kafka: bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}" ssl: