/** * 抖音关注:程序员三丙 * 知识星球:https://t.zsxq.com/j9b21 */ package sanbing.jcpp.infrastructure.cache; import lombok.Getter; import lombok.RequiredArgsConstructor; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import java.io.Serializable; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @RequiredArgsConstructor public abstract class CaffeineTransactionalCache implements TransactionalCache { @Getter protected final String cacheName; protected final Cache cache; protected final Lock lock = new ReentrantLock(); private final Map> objectTransactions = new HashMap<>(); private final Map> transactions = new HashMap<>(); public CaffeineTransactionalCache(CacheManager cacheManager, String cacheName) { this.cacheName = cacheName; this.cache = Optional.ofNullable(cacheManager.getCache(cacheName)) .orElseThrow(() -> new IllegalArgumentException("Cache '" + cacheName + "' is not configured")); } @Override public CacheValueWrapper get(K key) { return SimpleCacheValueWrapper.wrap(cache.get(key)); } @Override public void put(K key, V value) { lock.lock(); try { failAllTransactionsByKey(key); cache.put(key, value); } finally { lock.unlock(); } } @Override public void putIfAbsent(K key, V value) { lock.lock(); try { failAllTransactionsByKey(key); doPutIfAbsent(key, value); } finally { lock.unlock(); } } @Override public void evict(K key) { lock.lock(); try { failAllTransactionsByKey(key); doEvict(key); } finally { lock.unlock(); } } @Override public void evict(Collection keys) { lock.lock(); try { keys.forEach(key -> { failAllTransactionsByKey(key); doEvict(key); }); } finally { lock.unlock(); } } @Override public void evictOrPut(K key, V value) { evict(key); } @Override public CacheTransaction newTransactionForKey(K key) { return newTransaction(Collections.singletonList(key)); } @Override public CacheTransaction newTransactionForKeys(List keys) { return newTransaction(keys); } void doPutIfAbsent(K key, V value) { cache.putIfAbsent(key, value); } void doEvict(K key) { cache.evict(key); } CacheTransaction newTransaction(List keys) { lock.lock(); try { var transaction = new CaffeineCacheTransaction<>(this, keys); var transactionId = transaction.getId(); for (K key : keys) { objectTransactions.computeIfAbsent(key, k -> new HashSet<>()).add(transactionId); } transactions.put(transactionId, transaction); return transaction; } finally { lock.unlock(); } } public boolean commit(UUID trId, Map pendingPuts) { lock.lock(); try { var tr = transactions.get(trId); var success = !tr.isFailed(); if (success) { for (K key : tr.getKeys()) { Set otherTransactions = objectTransactions.get(key); if (otherTransactions != null) { for (UUID otherTrId : otherTransactions) { if (trId == null || !trId.equals(otherTrId)) { transactions.get(otherTrId).setFailed(true); } } } } pendingPuts.forEach(this::doPutIfAbsent); } removeTransaction(trId); return success; } finally { lock.unlock(); } } void rollback(UUID id) { lock.lock(); try { removeTransaction(id); } finally { lock.unlock(); } } private void removeTransaction(UUID id) { CaffeineCacheTransaction transaction = transactions.remove(id); if (transaction != null) { for (var key : transaction.getKeys()) { Set transactions = objectTransactions.get(key); if (transactions != null) { transactions.remove(id); if (transactions.isEmpty()) { objectTransactions.remove(key); } } } } } protected void failAllTransactionsByKey(K key) { Set transactionsIds = objectTransactions.get(key); if (transactionsIds != null) { for (UUID otherTrId : transactionsIds) { transactions.get(otherTrId).setFailed(true); } } } }