diff --git a/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java b/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java index b4d2a584c..8dfd3f17d 100644 --- a/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java +++ b/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java @@ -149,6 +149,9 @@ public class OrderService { // 引入线程池 private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); + // 引入第三方平台任务线程池 + private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor"); + /** * 生成订单 * @@ -796,8 +799,6 @@ public class OrderService { orderLogic.settleOrder(data, orderBasicInfo); } - - log.info("人工结算订单-end orderCode:{}", dto.getOrderCode()); // 异步推送第三方平台订单信息 @@ -807,7 +808,7 @@ public class OrderService { } catch (Exception e) { log.error("人工结算订单 推送第三方平台订单信息error, orderCode:{}", orderBasicInfo.getOrderCode(), e); } - }, executor); + }, thirdpartyTaskExecutor); return true; } @@ -1219,7 +1220,7 @@ public class OrderService { } catch (Exception e) { log.error("异步推送第三方平台启动充电逻辑 error", e); } - }, executor); + }, thirdpartyTaskExecutor); } else if (StringUtils.equals(scenarioType, ScenarioEnum.BALANCE.getValue())) { // 2-充值余额 // 充值余额成功 UpdateMemberBalanceDTO dto = new UpdateMemberBalanceDTO(); diff --git a/jsowell-admin/src/test/java/SpringBootTestController.java b/jsowell-admin/src/test/java/SpringBootTestController.java index 50a3efeb9..c73a4d0fd 100644 --- a/jsowell-admin/src/test/java/SpringBootTestController.java +++ b/jsowell-admin/src/test/java/SpringBootTestController.java @@ -49,6 +49,7 @@ import com.jsowell.common.util.id.IdUtils; import com.jsowell.common.util.id.SnowflakeIdWorker; import com.jsowell.common.util.id.UUID; import com.jsowell.common.util.ip.AddressUtils; +import com.jsowell.framework.async.JsowellThreadFactory; import com.jsowell.netty.handler.yunkuaichong.HeartbeatRequestHandler; import com.jsowell.netty.handler.yunkuaichong.TransactionRecordsRequestHandler; import com.jsowell.netty.service.camera.impl.CameraBusinessServiceImpl; @@ -114,6 +115,9 @@ import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; @ActiveProfiles("dev") @@ -297,6 +301,15 @@ public class SpringBootTestController { @DubboReference private JcppService jcppService; + ThreadFactory threadFactory = JsowellThreadFactory.forName("test-thread-factory"); + + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10, JsowellThreadFactory.forName("test-thread-factory")); + + @Test + public void testThread() { + threadFactory.newThread(() -> System.out.println("testThread:"+ Thread.currentThread().getName())); + } + @Test public void testDubbo() { String s = jcppService.sayHello("jsowell-charger-web"); diff --git a/jsowell-framework/src/main/java/com/jsowell/framework/async/JsowellThreadFactory.java b/jsowell-framework/src/main/java/com/jsowell/framework/async/JsowellThreadFactory.java new file mode 100644 index 000000000..0bb3b1e3d --- /dev/null +++ b/jsowell-framework/src/main/java/com/jsowell/framework/async/JsowellThreadFactory.java @@ -0,0 +1,48 @@ +/** + * 开源代码,仅供学习和交流研究使用,商用请联系三丙 + * 微信:mohan_88888 + * 抖音:程序员三丙 + * 付费课程知识星球:https://t.zsxq.com/aKtXo + */ +package com.jsowell.framework.async; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.concurrent.ThreadFactory; + +public class JsowellThreadFactory { + public static final String THREAD_TOPIC_SEPARATOR = " | "; + + public static ThreadFactory forName(String name) { + return new ThreadFactoryBuilder() + .setNameFormat(name) + .setDaemon(true) + .setPriority(Thread.NORM_PRIORITY) + .build(); + } + + public static ThreadFactory forName(String name, int priority) { + return new ThreadFactoryBuilder() + .setNameFormat(name) + .setDaemon(true) + .setPriority(priority) + .build(); + } + public static void updateCurrentThreadName(String threadSuffix) { + String name = Thread.currentThread().getName(); + int spliteratorIndex = name.indexOf(THREAD_TOPIC_SEPARATOR); + if (spliteratorIndex > 0) { + name = name.substring(0, spliteratorIndex); + } + name = name + THREAD_TOPIC_SEPARATOR + threadSuffix; + Thread.currentThread().setName(name); + } + + public static void addThreadNamePrefix(String prefix) { + String name = Thread.currentThread().getName(); + name = prefix + "-" + name; + Thread.currentThread().setName(name); + } + + +} diff --git a/jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java b/jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java index 9264cfe20..1c1623d73 100644 --- a/jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java +++ b/jsowell-framework/src/main/java/com/jsowell/framework/config/ThreadPoolConfig.java @@ -49,6 +49,22 @@ public class ThreadPoolConfig { return executor; } + /** + * 用于处理互联互通任务的线程池 + */ + @Bean(name = "thirdpartyTaskExecutor") + public ThreadPoolTaskExecutor thirdpartyTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setMaxPoolSize(maxPoolSize/10); + executor.setCorePoolSize(corePoolSize/10); + executor.setQueueCapacity(queueCapacity/10); + executor.setKeepAliveSeconds(keepAliveSeconds); + // 线程池对拒绝任务(无线程可用)的处理策略 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // log.info("threadPoolTaskExecutor创建成功"); + return executor; + } + /** * 执行周期性或定时任务 */ diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java index 8c83ac67d..7c980424b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java @@ -37,8 +37,8 @@ public class RemoteStartChargingRequestHandler extends AbstractYkcHandler { @Autowired private CommonService commonService; - // 引入线程池 - private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); + // 引入第三方平台任务线程池 + private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor"); @Override public void afterPropertiesSet() throws Exception { @@ -126,7 +126,7 @@ public class RemoteStartChargingRequestHandler extends AbstractYkcHandler { log.error("统一推送第三方平台订单信息error, ", e); } } - }, executor); + }, thirdpartyTaskExecutor); return null; } } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TransactionRecordsRequestHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TransactionRecordsRequestHandler.java index 7c01e555b..8b1fe8311 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TransactionRecordsRequestHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/TransactionRecordsRequestHandler.java @@ -51,8 +51,8 @@ public class TransactionRecordsRequestHandler extends AbstractYkcHandler { private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.TRANSACTION_RECORDS_CODE.getBytes()); private final String oldVersionType = YKCUtils.frameType2Str(YKCFrameTypeCode.TRANSACTION_RECORDS_OLD_VERSION_CODE.getBytes()); - // 引入线程池 - private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); + // 引入第三方平台任务线程池 + private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor"); @Override public void afterPropertiesSet() throws Exception { @@ -667,23 +667,23 @@ public class TransactionRecordsRequestHandler extends AbstractYkcHandler { OrderBasicInfo finalOrderBasicInfo = orderBasicInfo; - // 异步推送第三方平台订单信息 + // TODO 异步推送第三方平台订单信息 CompletableFuture.runAsync(() -> { try { commonService.commonPushOrderInfo(finalOrderBasicInfo); } catch (Exception e) { log.error("推送第三方平台订单信息error, ", e); } - }, executor); + }, thirdpartyTaskExecutor); - // 异步推送第三方平台订单信息V2 + // TODO 异步推送第三方平台订单信息V2 CompletableFuture.runAsync(() -> { try { commonService.commonPushOrderInfoV2(finalOrderBasicInfo); } catch (Exception e) { log.error("推送第三方平台订单信息error, ", e); } - }, executor); + }, thirdpartyTaskExecutor); // 异步推送充电订单算法平台 CompletableFuture.runAsync(() -> { @@ -693,7 +693,7 @@ public class TransactionRecordsRequestHandler extends AbstractYkcHandler { } catch (Exception e) { log.error("异步推送充电订单算法平台 error, ", e); } - }, executor); + }, thirdpartyTaskExecutor); } else { // 平台没有查到订单 diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/UploadRealTimeMonitorHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/UploadRealTimeMonitorHandler.java index 212570561..973b3b085 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/UploadRealTimeMonitorHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/UploadRealTimeMonitorHandler.java @@ -51,6 +51,9 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler { // 引入线程池 private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); + // 引入第三方平台任务线程池 + private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor"); + @Autowired private PileBasicInfoService pileBasicInfoService; @@ -352,15 +355,6 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler { } } - // 异步推送第三方平台实时数据 - // CompletableFuture.runAsync(() -> { - // try { - // commonService.pushRealTimeInfo(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode); - // } catch (Exception e) { - // log.error("统一推送第三方平台实时数据 error,", e); - // } - // }, executor); - // 异步推送第三方平台实时数据V2 CompletableFuture.runAsync(() -> { try { @@ -369,7 +363,7 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler { } catch (Exception e) { log.error("统一推送第三方平台实时数据V2 error, ", e); } - }, executor); + }, thirdpartyTaskExecutor); if (StringUtils.equals(connectorStatus, Constants.ONE)) { // 故障 @@ -380,7 +374,7 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler { } catch (Exception e) { log.error("统一推送第三方平台告警信息 error, ", e); } - }, executor); + }, thirdpartyTaskExecutor); } return null; diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java index 971ec438f..94a827682 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/rabbitmq/PileRabbitListener.java @@ -73,6 +73,9 @@ public class PileRabbitListener { // 引入线程池 private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); + // 引入第三方平台任务线程池 + private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor"); + // ========================= 消费消息 ========================== // // @RabbitListener(queues = "ykc.pileLogin-topic.device-group") // public void testRabbitMQMessage(String msg) { @@ -378,7 +381,7 @@ public class PileRabbitListener { } catch (Exception e) { log.error("统一推送第三方平台实时数据 error,", e); } - }, executor); + }, thirdpartyTaskExecutor); // 异步推送第三方平台实时数据V2 CompletableFuture.runAsync(() -> { @@ -387,7 +390,7 @@ public class PileRabbitListener { } catch (Exception e) { log.error("统一推送第三方平台实时数据V2 error, ", e); } - }, executor); + }, thirdpartyTaskExecutor); if (StringUtils.equals(connectorStatus, Constants.ONE)) { // 故障 @@ -398,7 +401,7 @@ public class PileRabbitListener { } catch (Exception e) { log.error("统一推送第三方平台告警信息 error, ", e); } - }, executor); + }, thirdpartyTaskExecutor); } }