protocolSession的缓存改为同步caffeine,因为没有IO操作

This commit is contained in:
三丙
2024-10-09 10:08:32 +08:00
parent a1db728be5
commit 4356eaba85
10 changed files with 74 additions and 58 deletions

View File

@@ -210,5 +210,5 @@ service:
thread-pool: thread-pool:
sharding: sharding:
hash_function_name: "${THREAD_POOL_SHARDING_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 hash_function_name: "${THREAD_POOL_SHARDING_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:128}" parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:8}"
stats-print-interval-ms: "${THREAD_POOL_SHARDING_STATS_PRINT_INTERVAL_MS:10000}" stats-print-interval-ms: "${THREAD_POOL_SHARDING_STATS_PRINT_INTERVAL_MS:10000}"

View File

@@ -0,0 +1,23 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.infrastructure.util.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class ScheduledTaskConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
scheduler.setThreadNamePrefix("scheduled-task-");
scheduler.initialize();
return scheduler;
}
}

View File

@@ -33,12 +33,12 @@ public class ShardingThreadPool {
@Value("${thread-pool.sharding.hash_function_name:murmur3_128}") @Value("${thread-pool.sharding.hash_function_name:murmur3_128}")
private String hashFunctionName; private String hashFunctionName;
@Value("${thread-pool.sharding.parallelism:128}") @Value("${thread-pool.sharding.parallelism:8}")
private int parallelism; private int parallelism;
private HashFunction hashFunction; private HashFunction hashFunction;
private final Map<Integer, ExecutorService> EXECUTOR_SERVICE_MAP = new ConcurrentHashMap<>(128); private final Map<Integer, ExecutorService> EXECUTOR_SERVICE_MAP = new ConcurrentHashMap<>(8);
@PostConstruct @PostConstruct
public void init() { public void init() {
@@ -59,13 +59,16 @@ public class ShardingThreadPool {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) v; ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) v;
log.info("分区 {}/{} 的线程池中剩余 {} 条待执行任务,当前正在执行的线程数 {}, 已完成任务 {} / {}", int size = threadPoolExecutor.getQueue().size();
k,
EXECUTOR_SERVICE_MAP.size(), if (size > 1) {
threadPoolExecutor.getQueue().size(), log.info("分区 {} 的线程池中剩余 {} 条待执行任务,当前正在执行的线程数 {}, 已完成任务 {} / {}",
threadPoolExecutor.getActiveCount(), k,
threadPoolExecutor.getCompletedTaskCount(), size,
threadPoolExecutor.getTaskCount()); threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getTaskCount());
}
}); });
} }

View File

@@ -47,7 +47,7 @@ public abstract class ProtocolMessageProcessor {
})); }));
} }
protected abstract void uplinkHandle(ListenerToHandlerMsg listenerToHandlerMsg) throws Exception; protected abstract void uplinkHandle(ListenerToHandlerMsg listenerToHandlerMsg);
public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg, MessagesStats downlinkMsgStats) throws DownlinkException { public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg, MessagesStats downlinkMsgStats) throws DownlinkException {
try { try {
@@ -62,5 +62,5 @@ public abstract class ProtocolMessageProcessor {
} }
} }
protected abstract void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg) throws Exception; protected abstract void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg);
} }

View File

@@ -20,7 +20,6 @@ import sanbing.jcpp.protocol.domain.ProtocolSession;
import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider; import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/** /**
* @author baigod * @author baigod
@@ -44,14 +43,14 @@ public class DownlinkController {
final DeferredResult<ResponseEntity<String>> response = new DeferredResult<>(onDownlinkTimeout, final DeferredResult<ResponseEntity<String>> response = new DeferredResult<>(onDownlinkTimeout,
ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build()); ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build());
UUID protocolSessionId = new UUID(downlinkMsg.getSessionIdMSB(),downlinkMsg.getSessionIdLSB()) ; UUID protocolSessionId = new UUID(downlinkMsg.getSessionIdMSB(), downlinkMsg.getSessionIdLSB());
CompletableFuture<ProtocolSession> protocolSessionCompletableFuture = protocolSessionRegistryProvider.get(protocolSessionId); ProtocolSession protocolSession = protocolSessionRegistryProvider.get(protocolSessionId);
protocolSessionCompletableFuture.thenAccept(session -> { try {
if (session != null) { if (protocolSession != null) {
session.onDownlink(downlinkMsg); protocolSession.onDownlink(downlinkMsg);
response.setResult(ResponseEntity.status(HttpStatus.OK).build()); response.setResult(ResponseEntity.status(HttpStatus.OK).build());
} else { } else {
@@ -60,17 +59,15 @@ public class DownlinkController {
response.setResult(ResponseEntity.status(HttpStatus.NOT_FOUND).body("Protocol Session not found for ID:" + protocolSessionId)); response.setResult(ResponseEntity.status(HttpStatus.NOT_FOUND).body("Protocol Session not found for ID:" + protocolSessionId));
} }
}).whenComplete((unused, throwable) -> { } catch (Exception e) {
if (throwable != null) {
log.warn("下发报文时处理失败 sessionId: {}", protocolSessionId, throwable); log.warn("下发报文时处理失败 sessionId: {}", protocolSessionId, e);
if (!response.isSetOrExpired()) { if (!response.isSetOrExpired()) {
response.setResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(throwable.getMessage())); response.setResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage()));
}
} }
}); }
return response; return response;
} }

View File

@@ -7,7 +7,6 @@ package sanbing.jcpp.protocol.provider;
import sanbing.jcpp.protocol.domain.ProtocolSession; import sanbing.jcpp.protocol.domain.ProtocolSession;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/** /**
* @author baigod * @author baigod
@@ -21,7 +20,7 @@ public interface ProtocolSessionRegistryProvider {
void unregister(UUID sessionId); void unregister(UUID sessionId);
CompletableFuture<ProtocolSession> get(UUID sessionId); ProtocolSession get(UUID sessionId);
/** /**
* 活跃会话 * 活跃会话

View File

@@ -4,7 +4,7 @@
*/ */
package sanbing.jcpp.protocol.provider.impl; package sanbing.jcpp.protocol.provider.impl;
import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
@@ -20,7 +20,6 @@ import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRegistryProvider { public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRegistryProvider {
private static final int INIT_CACHE_LIMIT = 100_000; private static final int INIT_CACHE_LIMIT = 100_000;
private static final int MAXIMUM_SIZE = 1_000_000;
@Value("${service.protocols.sessions.default-inactivity-timeout-in-sec}") @Value("${service.protocols.sessions.default-inactivity-timeout-in-sec}")
private int defaultInactivityTimeoutInSec; private int defaultInactivityTimeoutInSec;
@@ -41,23 +41,18 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
private int defaultStateCheckIntervalInSec; private int defaultStateCheckIntervalInSec;
@Getter @Getter
private final AsyncCache<UUID, ProtocolSession> SESSION_CACHE = buildCache(); private final Cache<UUID, ProtocolSession> sessionCache = buildCache();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("session-state-checker")); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("session-state-checker"));
@PostConstruct @PostConstruct
public void init() { public void init() {
scheduledExecutorService.scheduleAtFixedRate(() -> scheduledExecutorService.scheduleAtFixedRate(() -> sessionCache.asMap().forEach((id, session) -> {
SESSION_CACHE.asMap().forEach((id, sessionCompletableFuture) -> if (session.getLastActivityTime().isBefore(LocalDateTime.now().minusSeconds(defaultInactivityTimeoutInSec))) {
sessionCompletableFuture.whenComplete((protocolSession, throwable) -> { session.close(SessionCloseReason.INACTIVE);
if (throwable == null && protocolSession != null) { unregister(session.getId());
if (protocolSession.getLastActivityTime().isBefore(LocalDateTime.now().minusSeconds(defaultInactivityTimeoutInSec))) { }
protocolSession.close(SessionCloseReason.INACTIVE); }), defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
unregister(protocolSession.getId());
}
}
})
), defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
} }
@PreDestroy @PreDestroy
@@ -72,7 +67,7 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
log.debug("Registering session {}", protocolSession); log.debug("Registering session {}", protocolSession);
} }
SESSION_CACHE.put(protocolSession.getId(), CompletableFuture.supplyAsync(() -> protocolSession, ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL)); sessionCache.put(protocolSession.getId(), protocolSession);
} }
@@ -81,16 +76,15 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
log.info("Unregistering session {}", sessionId); log.info("Unregistering session {}", sessionId);
SESSION_CACHE.synchronous().invalidate(sessionId); sessionCache.invalidate(sessionId);
} }
@Override @Override
public CompletableFuture<ProtocolSession> get(UUID sessionId) { public ProtocolSession get(UUID sessionId) {
log.debug("Get session {}", sessionId); log.debug("Get session {}", sessionId);
return SESSION_CACHE.get(sessionId, uuid -> null); return sessionCache.get(sessionId, uuid -> null);
} }
@Override @Override
@@ -102,14 +96,14 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
protocolSession.setLastActivityTime(LocalDateTime.now()); protocolSession.setLastActivityTime(LocalDateTime.now());
SESSION_CACHE.put(protocolSession.getId(), CompletableFuture.supplyAsync(() -> protocolSession, ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL)); sessionCache.put(protocolSession.getId(), protocolSession);
} }
private AsyncCache<UUID, ProtocolSession> buildCache() { private Cache<UUID, ProtocolSession> buildCache() {
return Caffeine.newBuilder() return Caffeine.newBuilder()
.initialCapacity(INIT_CACHE_LIMIT) .initialCapacity(INIT_CACHE_LIMIT)
.maximumSize(INIT_CACHE_LIMIT * 10) .maximumSize(MAXIMUM_SIZE)
.executor(ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL) .executor(ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL)
.buildAsync(); .build();
} }
} }

View File

@@ -148,5 +148,5 @@ queue:
thread-pool: thread-pool:
sharding: sharding:
hash_function_name: "${THREAD_POOL_SHARDING_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 hash_function_name: "${THREAD_POOL_SHARDING_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:128}" parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:8}"
stats-print-interval-ms: "${THREAD_POOL_SHARDING_STATS_PRINT_INTERVAL_MS:10000}" stats-print-interval-ms: "${THREAD_POOL_SHARDING_STATS_PRINT_INTERVAL_MS:10000}"

View File

@@ -121,10 +121,10 @@ public class AbstractYunKuaiChongCmdExe {
} }
protected void encodeAndWriteFlush(YunKuaiChongV150DownlinkCmdEnum downlinkCmd, protected void encodeAndWriteFlush(YunKuaiChongV150DownlinkCmdEnum downlinkCmd,
int seqNo, int seqNo,
int encryptionFlag, int encryptionFlag,
ByteBuf msgBody, ByteBuf msgBody,
TcpSession tcpSession) { TcpSession tcpSession) {
byte[] encode = encode(downlinkCmd, seqNo, encryptionFlag, msgBody); byte[] encode = encode(downlinkCmd, seqNo, encryptionFlag, msgBody);
@@ -132,8 +132,8 @@ public class AbstractYunKuaiChongCmdExe {
} }
protected void encodeAndWriteFlush(YunKuaiChongV150DownlinkCmdEnum downlinkCmd, protected void encodeAndWriteFlush(YunKuaiChongV150DownlinkCmdEnum downlinkCmd,
ByteBuf msgBody, ByteBuf msgBody,
TcpSession tcpSession) { TcpSession tcpSession) {
byte[] encode = encode(downlinkCmd, byte[] encode = encode(downlinkCmd,
tcpSession.nextSeqNo(SequenceNumberLength.SHORT), tcpSession.nextSeqNo(SequenceNumberLength.SHORT),

View File

@@ -160,7 +160,7 @@ public class YunKuaiChongV15ProtocolMessageProcessor extends ProtocolMessageProc
} }
@Override @Override
public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg) throws Exception { public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg) {
TcpSession session = (TcpSession) sessionToHandlerMsg.session(); TcpSession session = (TcpSession) sessionToHandlerMsg.session();
DownlinkRestMessage protocolDownlinkMsg = sessionToHandlerMsg.downlinkMsg(); DownlinkRestMessage protocolDownlinkMsg = sessionToHandlerMsg.downlinkMsg();