本地队列最大消费大小可设置

This commit is contained in:
三丙
2024-12-16 11:28:55 +08:00
parent 53e39d737e
commit b05c3ae71a
2 changed files with 5 additions and 2 deletions

View File

@@ -65,6 +65,7 @@ queue:
hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
memory: memory:
queue-capacity: "${QUEUE_MEMORY_QUEUE_CAPACITY:100000}" queue-capacity: "${QUEUE_MEMORY_QUEUE_CAPACITY:100000}"
max-pool-size: "${QUEUE_MEMORY_MAX_POOL_SIZE:999}"
stats: stats:
print-interval-ms: "${QUEUE_MEMORY_STATS_PRINT_INTERVAL_MS:10000}" print-interval-ms: "${QUEUE_MEMORY_STATS_PRINT_INTERVAL_MS:10000}"
kafka: kafka:

View File

@@ -21,6 +21,8 @@ public final class DefaultInMemoryStorage implements InMemoryStorage {
@Value("${queue.memory.queue-capacity:100000}") @Value("${queue.memory.queue-capacity:100000}")
private int queueCapacity; private int queueCapacity;
@Value("${queue.memory.max-pool-size:999}")
private int maxPoolSize;
@Override @Override
public void printStats() { public void printStats() {
@@ -54,9 +56,9 @@ public final class DefaultInMemoryStorage implements InMemoryStorage {
if (firstMsg != null) { if (firstMsg != null) {
final int queueSize = queue.size(); final int queueSize = queue.size();
if (queueSize > 0) { if (queueSize > 0) {
final List<QueueMsg> entities = new ArrayList<>(Math.min(queueSize, 999) + 1); final List<QueueMsg> entities = new ArrayList<>(Math.min(queueSize, maxPoolSize) + 1);
entities.add(firstMsg); entities.add(firstMsg);
queue.drainTo(entities, 999); queue.drainTo(entities, maxPoolSize);
return entities; return entities;
} }
return Collections.singletonList(firstMsg); return Collections.singletonList(firstMsg);