/** * 抖音关注:程序员三丙 * 知识星球:https://t.zsxq.com/j9b21 */ package sanbing.jcpp.infrastructure.queue; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class PackProcessingContext { private final AtomicInteger pendingCount; private final CountDownLatch processingTimeoutLatch; @Getter private final ConcurrentMap ackMap; @Getter private final ConcurrentMap failedMap; public PackProcessingContext(CountDownLatch processingTimeoutLatch, ConcurrentMap ackMap, ConcurrentMap failedMap) { this.processingTimeoutLatch = processingTimeoutLatch; this.pendingCount = new AtomicInteger(ackMap.size()); this.ackMap = ackMap; this.failedMap = failedMap; } public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException { return processingTimeoutLatch.await(packProcessingTimeout, milliseconds); } public void onSuccess(UUID id) { boolean empty = false; T msg = ackMap.remove(id); if (msg != null) { empty = pendingCount.decrementAndGet() == 0; } if (empty) { processingTimeoutLatch.countDown(); } else { if (log.isTraceEnabled()) { log.trace("Items left: {}", ackMap.size()); for (T t : ackMap.values()) { log.trace("left item: {}", t); } } } } public void onFailure(UUID id, Throwable t) { boolean empty = false; T msg = ackMap.remove(id); if (msg != null) { empty = pendingCount.decrementAndGet() == 0; failedMap.put(id, msg); if (log.isTraceEnabled()) { log.trace("Items left: {}", ackMap.size()); for (T v : ackMap.values()) { log.trace("left item: {}", v); } } } if (empty) { processingTimeoutLatch.countDown(); } } }