protocolSession的缓存改为同步caffeine,因为没有IO操作

This commit is contained in:
三丙
2024-10-09 10:08:32 +08:00
parent a1db728be5
commit c3295ce01c
11 changed files with 75 additions and 59 deletions

View File

@@ -7,7 +7,6 @@ package sanbing.jcpp.protocol.provider;
import sanbing.jcpp.protocol.domain.ProtocolSession;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
* @author baigod
@@ -21,7 +20,7 @@ public interface ProtocolSessionRegistryProvider {
void unregister(UUID sessionId);
CompletableFuture<ProtocolSession> get(UUID sessionId);
ProtocolSession get(UUID sessionId);
/**
* 活跃会话

View File

@@ -4,7 +4,7 @@
*/
package sanbing.jcpp.protocol.provider.impl;
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -20,7 +20,6 @@ import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRegistryProvider {
private static final int INIT_CACHE_LIMIT = 100_000;
private static final int MAXIMUM_SIZE = 1_000_000;
@Value("${service.protocols.sessions.default-inactivity-timeout-in-sec}")
private int defaultInactivityTimeoutInSec;
@@ -41,23 +41,18 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
private int defaultStateCheckIntervalInSec;
@Getter
private final AsyncCache<UUID, ProtocolSession> SESSION_CACHE = buildCache();
private final Cache<UUID, ProtocolSession> sessionCache = buildCache();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("session-state-checker"));
@PostConstruct
public void init() {
scheduledExecutorService.scheduleAtFixedRate(() ->
SESSION_CACHE.asMap().forEach((id, sessionCompletableFuture) ->
sessionCompletableFuture.whenComplete((protocolSession, throwable) -> {
if (throwable == null && protocolSession != null) {
if (protocolSession.getLastActivityTime().isBefore(LocalDateTime.now().minusSeconds(defaultInactivityTimeoutInSec))) {
protocolSession.close(SessionCloseReason.INACTIVE);
unregister(protocolSession.getId());
}
}
})
), defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> sessionCache.asMap().forEach((id, session) -> {
if (session.getLastActivityTime().isBefore(LocalDateTime.now().minusSeconds(defaultInactivityTimeoutInSec))) {
session.close(SessionCloseReason.INACTIVE);
unregister(session.getId());
}
}), defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
}
@PreDestroy
@@ -72,7 +67,7 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
log.debug("Registering session {}", protocolSession);
}
SESSION_CACHE.put(protocolSession.getId(), CompletableFuture.supplyAsync(() -> protocolSession, ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL));
sessionCache.put(protocolSession.getId(), protocolSession);
}
@@ -81,16 +76,15 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
log.info("Unregistering session {}", sessionId);
SESSION_CACHE.synchronous().invalidate(sessionId);
sessionCache.invalidate(sessionId);
}
@Override
public CompletableFuture<ProtocolSession> get(UUID sessionId) {
public ProtocolSession get(UUID sessionId) {
log.debug("Get session {}", sessionId);
return SESSION_CACHE.get(sessionId, uuid -> null);
return sessionCache.get(sessionId, uuid -> null);
}
@Override
@@ -102,14 +96,14 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
protocolSession.setLastActivityTime(LocalDateTime.now());
SESSION_CACHE.put(protocolSession.getId(), CompletableFuture.supplyAsync(() -> protocolSession, ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL));
sessionCache.put(protocolSession.getId(), protocolSession);
}
private AsyncCache<UUID, ProtocolSession> buildCache() {
private Cache<UUID, ProtocolSession> buildCache() {
return Caffeine.newBuilder()
.initialCapacity(INIT_CACHE_LIMIT)
.maximumSize(INIT_CACHE_LIMIT * 10)
.maximumSize(MAXIMUM_SIZE)
.executor(ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL)
.buildAsync();
.build();
}
}