本地队列容量可配置

This commit is contained in:
三丙
2024-10-16 16:15:59 +08:00
parent 7d8a072084
commit 73bc580a0a
4 changed files with 8 additions and 6 deletions

View File

@@ -63,7 +63,8 @@ queue:
type: "${QUEUE_TYPE:memory}"
partitions:
hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
in_memory:
in-memory:
queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}"
stats:
print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
kafka:

View File

@@ -5,6 +5,7 @@
package sanbing.jcpp.infrastructure.queue.memory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import sanbing.jcpp.infrastructure.queue.QueueMsg;
@@ -18,6 +19,9 @@ import java.util.concurrent.LinkedBlockingQueue;
public final class DefaultInMemoryStorage implements InMemoryStorage {
private final ConcurrentHashMap<String, BlockingQueue<QueueMsg>> storage = new ConcurrentHashMap<>();
@Value("${queue.in-memory.queue-capacity:100000}")
private int queueCapacity;
@Override
public void printStats() {
storage.forEach((topic, queue) -> {
@@ -39,7 +43,7 @@ public final class DefaultInMemoryStorage implements InMemoryStorage {
@Override
public boolean put(String topic, QueueMsg msg) {
return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(10000)).add(msg);
return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(queueCapacity)).add(msg);
}
@Override

View File

@@ -40,7 +40,7 @@ public class InMemoryAppQueueFactory implements AppQueueFactory {
return new InMemoryQueueProducer<>(storage, topic);
}
@Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")
@Scheduled(fixedRateString = "${queue.in-memory.stats.print-interval-ms:60000}")
private void printInMemoryStats() {
storage.printStats();
}

View File

@@ -98,9 +98,6 @@ queue:
type: "${QUEUE_TYPE:kafka}"
partitions:
hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
in_memory:
stats:
print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
kafka:
bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}"
ssl: