/** * 抖音关注:程序员三丙 * 知识星球:https://t.zsxq.com/j9b21 */ package sanbing.jcpp.infrastructure.queue; import jakarta.annotation.Nonnull; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import sanbing.jcpp.infrastructure.queue.common.TopicPartitionInfo; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static java.util.Collections.emptyList; @Slf4j public abstract class AbstractQueueConsumerTemplate implements QueueConsumer{ public static final long ONE_MILLISECOND_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(1); private volatile boolean subscribed; protected volatile boolean stopped = false; protected volatile Set partitions; protected final ReentrantLock consumerLock = new ReentrantLock(); //NonfairSync final Queue> subscribeQueue = new ConcurrentLinkedQueue<>(); @Getter private final String topic; public AbstractQueueConsumerTemplate(String topic) { this.topic = topic; } @Override public void subscribe() { log.debug("enqueue topic subscribe {} ", topic); if (stopped) { log.error("trying subscribe, but consumer stopped for topic {}", topic); return; } subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(topic, null, true))); } @Override public void subscribe(Set partitions) { log.debug("enqueue topics subscribe {} ", partitions); if (stopped) { log.error("trying subscribe, but consumer stopped for topic {}", topic); return; } subscribeQueue.add(partitions); } @Override public List poll(long durationInMillis) { List records; long startNanos = System.nanoTime(); if (stopped) { log.error("poll invoked but consumer stopped for topic {}", topic, new RuntimeException("stacktrace")); return emptyList(); } if (!subscribed && partitions == null && subscribeQueue.isEmpty()) { return sleepAndReturnEmpty(startNanos, durationInMillis); } if (consumerLock.isLocked()) { log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic {}", topic, new RuntimeException("stacktrace")); } consumerLock.lock(); try { while (!subscribeQueue.isEmpty()) { subscribed = false; partitions = subscribeQueue.poll(); } if (!subscribed) { List topicNames = getFullTopicNames(); log.info("Subscribing to topics {}", topicNames); doSubscribe(topicNames); subscribed = true; } records = partitions.isEmpty() ? emptyList() : doPoll(durationInMillis); } finally { consumerLock.unlock(); } if (records.isEmpty() && !isLongPollingSupported()) { return sleepAndReturnEmpty(startNanos, durationInMillis); } return decodeRecords(records); } @Nonnull List decodeRecords(@Nonnull List records) { List result = new ArrayList<>(records.size()); records.forEach(record -> { try { if (record != null) { result.add(decode(record)); } } catch (IOException e) { log.error("Failed decode record: [{}]", record); throw new RuntimeException("Failed to decode record: ", e); } }); return result; } List sleepAndReturnEmpty(final long startNanos, final long durationInMillis) { long durationNanos = TimeUnit.MILLISECONDS.toNanos(durationInMillis); long spentNanos = System.nanoTime() - startNanos; long nanosLeft = durationNanos - spentNanos; if (nanosLeft >= ONE_MILLISECOND_IN_NANOS) { try { long sleepMs = TimeUnit.NANOSECONDS.toMillis(nanosLeft); log.trace("Going to sleep after poll: topic {} for {}ms", topic, sleepMs); Thread.sleep(sleepMs); } catch (InterruptedException e) { if (!stopped) { log.error("Failed to wait", e); } } } return emptyList(); } @Override public void commit() { if (consumerLock.isLocked()) { log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic {}", topic, new RuntimeException("stacktrace")); } consumerLock.lock(); try { doCommit(); } finally { consumerLock.unlock(); } } @Override public void stop() { stopped = true; } @Override public void unsubscribe() { log.info("Unsubscribing and stopping consumer for topics {}", getFullTopicNames()); stopped = true; consumerLock.lock(); try { if (subscribed) { doUnsubscribe(); } } finally { consumerLock.unlock(); } } @Override public boolean isStopped() { return stopped; } abstract protected List doPoll(long durationInMillis); abstract protected T decode(R record) throws IOException; abstract protected void doSubscribe(List topicNames); abstract protected void doCommit(); abstract protected void doUnsubscribe(); @Override public List getFullTopicNames() { if (partitions == null) { return Collections.emptyList(); } return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); } protected boolean isLongPollingSupported() { return false; } }