From 4356eaba85cf1518ecdc695d443fb97672585d7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= <10604541+sanbing-os@user.noreply.gitee.com> Date: Wed, 9 Oct 2024 10:08:32 +0800 Subject: [PATCH] =?UTF-8?q?protocolSession=E7=9A=84=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E5=90=8C=E6=AD=A5caffeine=EF=BC=8C=E5=9B=A0?= =?UTF-8?q?=E4=B8=BA=E6=B2=A1=E6=9C=89IO=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/app-service.yml | 2 +- .../util/config/ScheduledTaskConfig.java | 23 +++++++++++ .../util/config/ShardingThreadPool.java | 21 +++++----- .../protocol/ProtocolMessageProcessor.java | 4 +- .../protocol/adapter/DownlinkController.java | 23 +++++------ .../ProtocolSessionRegistryProvider.java | 3 +- ...efaultProtocolSessionRegistryProvider.java | 40 ++++++++----------- .../src/main/resources/protocol-service.yml | 2 +- .../AbstractYunKuaiChongCmdExe.java | 12 +++--- ...nKuaiChongV15ProtocolMessageProcessor.java | 2 +- 10 files changed, 74 insertions(+), 58 deletions(-) create mode 100644 jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ScheduledTaskConfig.java diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 175b136..14d120d 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -210,5 +210,5 @@ service: thread-pool: sharding: hash_function_name: "${THREAD_POOL_SHARDING_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 - parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:128}" + parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:8}" stats-print-interval-ms: "${THREAD_POOL_SHARDING_STATS_PRINT_INTERVAL_MS:10000}" diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ScheduledTaskConfig.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ScheduledTaskConfig.java new file mode 100644 index 0000000..c68c811 --- /dev/null +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ScheduledTaskConfig.java @@ -0,0 +1,23 @@ +/** + * 抖音关注:程序员三丙 + * 知识星球:https://t.zsxq.com/j9b21 + */ +package sanbing.jcpp.infrastructure.util.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +@Configuration +public class ScheduledTaskConfig { + + @Bean + public TaskScheduler taskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); + scheduler.setThreadNamePrefix("scheduled-task-"); + scheduler.initialize(); + return scheduler; + } +} \ No newline at end of file diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java index 4d5338b..5bf3513 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java @@ -33,12 +33,12 @@ public class ShardingThreadPool { @Value("${thread-pool.sharding.hash_function_name:murmur3_128}") private String hashFunctionName; - @Value("${thread-pool.sharding.parallelism:128}") + @Value("${thread-pool.sharding.parallelism:8}") private int parallelism; private HashFunction hashFunction; - private final Map EXECUTOR_SERVICE_MAP = new ConcurrentHashMap<>(128); + private final Map EXECUTOR_SERVICE_MAP = new ConcurrentHashMap<>(8); @PostConstruct public void init() { @@ -59,13 +59,16 @@ public class ShardingThreadPool { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) v; - log.info("分区 {}/{} 的线程池中剩余 {} 条待执行任务,当前正在执行的线程数 {}, 已完成任务 {} / {}", - k, - EXECUTOR_SERVICE_MAP.size(), - threadPoolExecutor.getQueue().size(), - threadPoolExecutor.getActiveCount(), - threadPoolExecutor.getCompletedTaskCount(), - threadPoolExecutor.getTaskCount()); + int size = threadPoolExecutor.getQueue().size(); + + if (size > 1) { + log.info("分区 {} 的线程池中剩余 {} 条待执行任务,当前正在执行的线程数 {}, 已完成任务 {} / {}", + k, + size, + threadPoolExecutor.getActiveCount(), + threadPoolExecutor.getCompletedTaskCount(), + threadPoolExecutor.getTaskCount()); + } }); } diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/ProtocolMessageProcessor.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/ProtocolMessageProcessor.java index 848cfd3..506a372 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/ProtocolMessageProcessor.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/ProtocolMessageProcessor.java @@ -47,7 +47,7 @@ public abstract class ProtocolMessageProcessor { })); } - protected abstract void uplinkHandle(ListenerToHandlerMsg listenerToHandlerMsg) throws Exception; + protected abstract void uplinkHandle(ListenerToHandlerMsg listenerToHandlerMsg); public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg, MessagesStats downlinkMsgStats) throws DownlinkException { try { @@ -62,5 +62,5 @@ public abstract class ProtocolMessageProcessor { } } - protected abstract void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg) throws Exception; + protected abstract void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg); } \ No newline at end of file diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java index 8fe97a3..facf5ea 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java @@ -20,7 +20,6 @@ import sanbing.jcpp.protocol.domain.ProtocolSession; import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider; import java.util.UUID; -import java.util.concurrent.CompletableFuture; /** * @author baigod @@ -44,14 +43,14 @@ public class DownlinkController { final DeferredResult> response = new DeferredResult<>(onDownlinkTimeout, ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build()); - UUID protocolSessionId = new UUID(downlinkMsg.getSessionIdMSB(),downlinkMsg.getSessionIdLSB()) ; + UUID protocolSessionId = new UUID(downlinkMsg.getSessionIdMSB(), downlinkMsg.getSessionIdLSB()); - CompletableFuture protocolSessionCompletableFuture = protocolSessionRegistryProvider.get(protocolSessionId); + ProtocolSession protocolSession = protocolSessionRegistryProvider.get(protocolSessionId); - protocolSessionCompletableFuture.thenAccept(session -> { - if (session != null) { + try { + if (protocolSession != null) { - session.onDownlink(downlinkMsg); + protocolSession.onDownlink(downlinkMsg); response.setResult(ResponseEntity.status(HttpStatus.OK).build()); } else { @@ -60,17 +59,15 @@ public class DownlinkController { response.setResult(ResponseEntity.status(HttpStatus.NOT_FOUND).body("Protocol Session not found for ID:" + protocolSessionId)); } - }).whenComplete((unused, throwable) -> { - if (throwable != null) { + } catch (Exception e) { - log.warn("下发报文时处理失败 sessionId: {}", protocolSessionId, throwable); + log.warn("下发报文时处理失败 sessionId: {}", protocolSessionId, e); - if (!response.isSetOrExpired()) { + if (!response.isSetOrExpired()) { - response.setResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(throwable.getMessage())); - } + response.setResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage())); } - }); + } return response; } diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/ProtocolSessionRegistryProvider.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/ProtocolSessionRegistryProvider.java index be674f8..b142757 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/ProtocolSessionRegistryProvider.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/ProtocolSessionRegistryProvider.java @@ -7,7 +7,6 @@ package sanbing.jcpp.protocol.provider; import sanbing.jcpp.protocol.domain.ProtocolSession; import java.util.UUID; -import java.util.concurrent.CompletableFuture; /** * @author baigod @@ -21,7 +20,7 @@ public interface ProtocolSessionRegistryProvider { void unregister(UUID sessionId); - CompletableFuture get(UUID sessionId); + ProtocolSession get(UUID sessionId); /** * 活跃会话 diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/impl/DefaultProtocolSessionRegistryProvider.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/impl/DefaultProtocolSessionRegistryProvider.java index c32ef51..ae823b0 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/impl/DefaultProtocolSessionRegistryProvider.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/impl/DefaultProtocolSessionRegistryProvider.java @@ -4,7 +4,7 @@ */ package sanbing.jcpp.protocol.provider.impl; -import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -20,7 +20,6 @@ import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider; import java.time.LocalDateTime; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -33,6 +32,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRegistryProvider { private static final int INIT_CACHE_LIMIT = 100_000; + private static final int MAXIMUM_SIZE = 1_000_000; @Value("${service.protocols.sessions.default-inactivity-timeout-in-sec}") private int defaultInactivityTimeoutInSec; @@ -41,23 +41,18 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe private int defaultStateCheckIntervalInSec; @Getter - private final AsyncCache SESSION_CACHE = buildCache(); + private final Cache sessionCache = buildCache(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("session-state-checker")); @PostConstruct public void init() { - scheduledExecutorService.scheduleAtFixedRate(() -> - SESSION_CACHE.asMap().forEach((id, sessionCompletableFuture) -> - sessionCompletableFuture.whenComplete((protocolSession, throwable) -> { - if (throwable == null && protocolSession != null) { - if (protocolSession.getLastActivityTime().isBefore(LocalDateTime.now().minusSeconds(defaultInactivityTimeoutInSec))) { - protocolSession.close(SessionCloseReason.INACTIVE); - unregister(protocolSession.getId()); - } - } - }) - ), defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS); + scheduledExecutorService.scheduleAtFixedRate(() -> sessionCache.asMap().forEach((id, session) -> { + if (session.getLastActivityTime().isBefore(LocalDateTime.now().minusSeconds(defaultInactivityTimeoutInSec))) { + session.close(SessionCloseReason.INACTIVE); + unregister(session.getId()); + } + }), defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS); } @PreDestroy @@ -72,7 +67,7 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe log.debug("Registering session {}", protocolSession); } - SESSION_CACHE.put(protocolSession.getId(), CompletableFuture.supplyAsync(() -> protocolSession, ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL)); + sessionCache.put(protocolSession.getId(), protocolSession); } @@ -81,16 +76,15 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe log.info("Unregistering session {}", sessionId); - SESSION_CACHE.synchronous().invalidate(sessionId); + sessionCache.invalidate(sessionId); } @Override - public CompletableFuture get(UUID sessionId) { + public ProtocolSession get(UUID sessionId) { log.debug("Get session {}", sessionId); - return SESSION_CACHE.get(sessionId, uuid -> null); - + return sessionCache.get(sessionId, uuid -> null); } @Override @@ -102,14 +96,14 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe protocolSession.setLastActivityTime(LocalDateTime.now()); - SESSION_CACHE.put(protocolSession.getId(), CompletableFuture.supplyAsync(() -> protocolSession, ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL)); + sessionCache.put(protocolSession.getId(), protocolSession); } - private AsyncCache buildCache() { + private Cache buildCache() { return Caffeine.newBuilder() .initialCapacity(INIT_CACHE_LIMIT) - .maximumSize(INIT_CACHE_LIMIT * 10) + .maximumSize(MAXIMUM_SIZE) .executor(ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL) - .buildAsync(); + .build(); } } \ No newline at end of file diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index 4baf214..284a6bb 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -148,5 +148,5 @@ queue: thread-pool: sharding: hash_function_name: "${THREAD_POOL_SHARDING_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 - parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:128}" + parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:8}" stats-print-interval-ms: "${THREAD_POOL_SHARDING_STATS_PRINT_INTERVAL_MS:10000}" diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/AbstractYunKuaiChongCmdExe.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/AbstractYunKuaiChongCmdExe.java index f38e54a..0d98b61 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/AbstractYunKuaiChongCmdExe.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/AbstractYunKuaiChongCmdExe.java @@ -121,10 +121,10 @@ public class AbstractYunKuaiChongCmdExe { } protected void encodeAndWriteFlush(YunKuaiChongV150DownlinkCmdEnum downlinkCmd, - int seqNo, - int encryptionFlag, - ByteBuf msgBody, - TcpSession tcpSession) { + int seqNo, + int encryptionFlag, + ByteBuf msgBody, + TcpSession tcpSession) { byte[] encode = encode(downlinkCmd, seqNo, encryptionFlag, msgBody); @@ -132,8 +132,8 @@ public class AbstractYunKuaiChongCmdExe { } protected void encodeAndWriteFlush(YunKuaiChongV150DownlinkCmdEnum downlinkCmd, - ByteBuf msgBody, - TcpSession tcpSession) { + ByteBuf msgBody, + TcpSession tcpSession) { byte[] encode = encode(downlinkCmd, tcpSession.nextSeqNo(SequenceNumberLength.SHORT), diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java index 43164b8..04d18c8 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java @@ -160,7 +160,7 @@ public class YunKuaiChongV15ProtocolMessageProcessor extends ProtocolMessageProc } @Override - public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg) throws Exception { + public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg) { TcpSession session = (TcpSession) sessionToHandlerMsg.session(); DownlinkRestMessage protocolDownlinkMsg = sessionToHandlerMsg.downlinkMsg();