From 6f5fdc8a429808beea81f19d4bd738eadb7f3ddd Mon Sep 17 00:00:00 2001 From: Lemon Date: Mon, 9 Sep 2024 21:32:31 +0800 Subject: [PATCH] update --- .../java/com/jsowell/service/PileService.java | 8 +- .../com/jsowell/common/util/FutureUtil.java | 181 ++++++++++++++++++ 2 files changed, 185 insertions(+), 4 deletions(-) create mode 100644 jsowell-common/src/main/java/com/jsowell/common/util/FutureUtil.java diff --git a/jsowell-admin/src/main/java/com/jsowell/service/PileService.java b/jsowell-admin/src/main/java/com/jsowell/service/PileService.java index 08dfd299c..618ef48ee 100644 --- a/jsowell-admin/src/main/java/com/jsowell/service/PileService.java +++ b/jsowell-admin/src/main/java/com/jsowell/service/PileService.java @@ -187,18 +187,18 @@ public class PileService { } // 查询充电桩下枪口信息 - CompletableFuture> connectorInfoListFuture = CompletableFuture.supplyAsync(() -> pileConnectorInfoService.selectConnectorInfoList(pileSn)); + CompletableFuture> connectorInfoListFuture = FutureUtil.supplyAsync(() -> pileConnectorInfoService.selectConnectorInfoList(pileSn)); log.info("查询充电枪口详情-supplyAsync-selectConnectorInfoList:{}", connectorInfoListFuture); // 查计费模板信息 - CompletableFuture> billingPriceFuture = CompletableFuture.supplyAsync(() -> pileBillingTemplateService.queryBillingPrice(pileInfoVO.getStationId())); + CompletableFuture> billingPriceFuture = FutureUtil.supplyAsync(() -> pileBillingTemplateService.queryBillingPrice(pileInfoVO.getStationId())); log.info("查询充电枪口详情-supplyAsync-queryBillingPrice:{}", billingPriceFuture); // 查询运营商信息 - CompletableFuture merchantInfoVOFuture = CompletableFuture.supplyAsync(() -> pileMerchantInfoService.getMerchantInfoVO(pileInfoVO.getMerchantId())); + CompletableFuture merchantInfoVOFuture = FutureUtil.supplyAsync(() -> pileMerchantInfoService.getMerchantInfoVO(pileInfoVO.getMerchantId())); log.info("查询充电枪口详情-supplyAsync-getMerchantInfoVO:{}", merchantInfoVOFuture); - CompletableFuture all = CompletableFuture.allOf(connectorInfoListFuture, merchantInfoVOFuture, billingPriceFuture); + CompletableFuture all = FutureUtil.allOf(connectorInfoListFuture, merchantInfoVOFuture, billingPriceFuture); // .join()和.get()都会阻塞并获取线程的执行情况 // .join()会抛出未经检查的异常,不会强制开发者处理异常 .get()会抛出检查异常,需要开发者处理 all.join(); diff --git a/jsowell-common/src/main/java/com/jsowell/common/util/FutureUtil.java b/jsowell-common/src/main/java/com/jsowell/common/util/FutureUtil.java new file mode 100644 index 000000000..6b655dce5 --- /dev/null +++ b/jsowell-common/src/main/java/com/jsowell/common/util/FutureUtil.java @@ -0,0 +1,181 @@ +package com.jsowell.common.util; + +/** + * TODO + * + * @author Lemon + * @Date 2024/9/9 21:28:43 + */ + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * 多任务处理工具类 + * @Author WhHao + * @Date 2022/8/10 16:04 + * @Return + */ +@Slf4j +public class FutureUtil { + + /** + * cpu 核心数 + */ + private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); + + // 最大超时时间 + private static final int TIMEOUT_VALUE = 1500; + // 时间单位 + private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + + /** + * Singleton delay scheduler, used only for starting and * cancelling tasks. + */ + public static final class Delayer { + + static final ScheduledThreadPoolExecutor delayer; + + /** + * 异常线程,不做请求处理,只抛出异常 + */ + static { + delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); + delayer.setRemoveOnCancelPolicy(true); + } + + static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { + return delayer.schedule(command, delay, unit); + } + + static final class DaemonThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("CompletableFutureScheduler"); + return t; + } + } + } + + /** + * 根据服务器cpu自定义线程池 + */ + private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + AVALIABLE_PROCESSORS, + 3 * AVALIABLE_PROCESSORS, + 3, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(20), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + + /** + * 有返回值的异步 + * + * @param supplier + * @param + * @return + */ + public static CompletableFuture supplyAsync(Supplier supplier) { + return supplyAsync(TIMEOUT_VALUE, TIMEOUT_UNIT, supplier); + } + + /** + * 有返回值的异步 - 可设置超时时间 + * + * @param timeout + * @param unit + * @param supplier + * @param + * @return + */ + public static CompletableFuture supplyAsync(long timeout, TimeUnit unit, Supplier supplier) { + return CompletableFuture.supplyAsync(supplier, threadPoolExecutor) + .applyToEither(timeoutAfter(timeout, unit), Function.identity()) + .exceptionally(throwable -> { + throwable.printStackTrace(); + log.error(throwable.getMessage()); + return null; + }); + } + + /** + * 无返回值的异步 + * + * @param runnable + * @return + */ + public static CompletableFuture runAsync(Runnable runnable) { + return runAsync(TIMEOUT_VALUE, TIMEOUT_UNIT, runnable); + } + + /** + * 无返回值的异步 - 可设置超时时间 + * + * @param runnable + * @return + */ + public static CompletableFuture runAsync(long timeout, TimeUnit unit, Runnable runnable) { + return CompletableFuture.runAsync(runnable, threadPoolExecutor) + .applyToEither(timeoutAfter(timeout, unit), Function.identity()) + .exceptionally(throwable -> { + throwable.printStackTrace(); + log.error(throwable.getMessage()); + return null; + }); + } + + /** + * 统一处理异步结果 + * + * @param futures + * @return + */ + public static CompletableFuture allOf(CompletableFuture... futures) { + return allOf(TIMEOUT_VALUE, TIMEOUT_UNIT, futures); + } + + /** + * 统一处理异步结果 - 可设置超时时间 + * + * @param futures + * @return + */ + public static CompletableFuture allOf(long timeout, TimeUnit unit, CompletableFuture... futures) { + return CompletableFuture.allOf(futures) + .applyToEither(timeoutAfter(timeout, unit), Function.identity()) + .exceptionally(throwable -> { + throwable.printStackTrace(); + log.error(throwable.getMessage()); + return null; + }); + } + + /** + * 异步超时处理 + * + * @param timeout + * @param unit + * @param + * @return + */ + public static CompletableFuture timeoutAfter(long timeout, TimeUnit unit) { + CompletableFuture result = new CompletableFuture(); + // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher + Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit); + return result; + } + + public static CompletableFuture timeoutAfter() { + CompletableFuture result = new CompletableFuture(); + // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher + Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT); + return result; + } +}