Files
JChargePointProtocol/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/AbstractQueueConsumerTemplate.java

193 lines
6.1 KiB
Java
Raw Normal View History

2024-10-08 09:38:54 +08:00
/**
2025-03-04 10:42:17 +08:00
* 开源代码仅供学习和交流研究使用商用请联系三丙
* 微信mohan_88888
* 抖音程序员三丙
* 付费课程知识星球https://t.zsxq.com/aKtXo
2024-10-08 09:38:54 +08:00
*/
package sanbing.jcpp.infrastructure.queue;
import jakarta.annotation.Nonnull;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.infrastructure.queue.common.TopicPartitionInfo;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
@Slf4j
public abstract class AbstractQueueConsumerTemplate<R, T extends QueueMsg> implements QueueConsumer<T>{
public static final long ONE_MILLISECOND_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
private volatile boolean subscribed;
protected volatile boolean stopped = false;
protected volatile Set<TopicPartitionInfo> partitions;
protected final ReentrantLock consumerLock = new ReentrantLock(); //NonfairSync
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
@Getter
private final String topic;
public AbstractQueueConsumerTemplate(String topic) {
this.topic = topic;
}
@Override
public void subscribe() {
log.debug("enqueue topic subscribe {} ", topic);
if (stopped) {
log.error("trying subscribe, but consumer stopped for topic {}", topic);
return;
}
subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(topic, null, true)));
}
@Override
public void subscribe(Set<TopicPartitionInfo> partitions) {
log.debug("enqueue topics subscribe {} ", partitions);
if (stopped) {
log.error("trying subscribe, but consumer stopped for topic {}", topic);
return;
}
subscribeQueue.add(partitions);
}
@Override
public List<T> poll(long durationInMillis) {
List<R> records;
long startNanos = System.nanoTime();
if (stopped) {
2024-10-11 15:27:42 +08:00
log.error("poll invoked but consumer stopped for topic {}", topic, new RuntimeException("stacktrace"));
2024-10-08 09:38:54 +08:00
return emptyList();
}
if (!subscribed && partitions == null && subscribeQueue.isEmpty()) {
return sleepAndReturnEmpty(startNanos, durationInMillis);
}
if (consumerLock.isLocked()) {
2024-10-11 15:27:42 +08:00
log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic {}", topic, new RuntimeException("stacktrace"));
2024-10-08 09:38:54 +08:00
}
consumerLock.lock();
try {
while (!subscribeQueue.isEmpty()) {
subscribed = false;
partitions = subscribeQueue.poll();
}
if (!subscribed) {
List<String> topicNames = getFullTopicNames();
log.info("Subscribing to topics {}", topicNames);
doSubscribe(topicNames);
subscribed = true;
}
records = partitions.isEmpty() ? emptyList() : doPoll(durationInMillis);
} finally {
consumerLock.unlock();
}
if (records.isEmpty() && !isLongPollingSupported()) {
return sleepAndReturnEmpty(startNanos, durationInMillis);
}
return decodeRecords(records);
}
@Nonnull
List<T> decodeRecords(@Nonnull List<R> records) {
List<T> result = new ArrayList<>(records.size());
records.forEach(record -> {
try {
if (record != null) {
result.add(decode(record));
}
} catch (IOException e) {
log.error("Failed decode record: [{}]", record);
throw new RuntimeException("解码记录失败: ", e);
2024-10-08 09:38:54 +08:00
}
});
return result;
}
List<T> sleepAndReturnEmpty(final long startNanos, final long durationInMillis) {
long durationNanos = TimeUnit.MILLISECONDS.toNanos(durationInMillis);
long spentNanos = System.nanoTime() - startNanos;
long nanosLeft = durationNanos - spentNanos;
if (nanosLeft >= ONE_MILLISECOND_IN_NANOS) {
try {
long sleepMs = TimeUnit.NANOSECONDS.toMillis(nanosLeft);
log.trace("Going to sleep after poll: topic {} for {}ms", topic, sleepMs);
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
if (!stopped) {
log.error("Failed to wait", e);
}
}
}
return emptyList();
}
@Override
public void commit() {
if (consumerLock.isLocked()) {
2024-10-11 15:27:42 +08:00
log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic {}", topic, new RuntimeException("stacktrace"));
2024-10-08 09:38:54 +08:00
}
consumerLock.lock();
try {
doCommit();
} finally {
consumerLock.unlock();
}
}
@Override
public void stop() {
stopped = true;
}
@Override
public void unsubscribe() {
log.info("Unsubscribing and stopping consumer for topics {}", getFullTopicNames());
stopped = true;
consumerLock.lock();
try {
if (subscribed) {
doUnsubscribe();
}
} finally {
consumerLock.unlock();
}
}
@Override
public boolean isStopped() {
return stopped;
}
abstract protected List<R> doPoll(long durationInMillis);
abstract protected T decode(R record) throws IOException;
abstract protected void doSubscribe(List<String> topicNames);
abstract protected void doCommit();
abstract protected void doUnsubscribe();
@Override
public List<String> getFullTopicNames() {
if (partitions == null) {
return Collections.emptyList();
}
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
}
protected boolean isLongPollingSupported() {
return false;
}
}