引入第三方平台任务线程池

This commit is contained in:
Guoqs
2025-07-22 16:48:54 +08:00
parent 57487e5892
commit 7b71191daa
8 changed files with 103 additions and 28 deletions

View File

@@ -149,6 +149,9 @@ public class OrderService {
// 引入线程池 // 引入线程池
private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor");
// 引入第三方平台任务线程池
private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor");
/** /**
* 生成订单 * 生成订单
* *
@@ -796,8 +799,6 @@ public class OrderService {
orderLogic.settleOrder(data, orderBasicInfo); orderLogic.settleOrder(data, orderBasicInfo);
} }
log.info("人工结算订单-end orderCode:{}", dto.getOrderCode()); log.info("人工结算订单-end orderCode:{}", dto.getOrderCode());
// 异步推送第三方平台订单信息 // 异步推送第三方平台订单信息
@@ -807,7 +808,7 @@ public class OrderService {
} catch (Exception e) { } catch (Exception e) {
log.error("人工结算订单 推送第三方平台订单信息error, orderCode:{}", orderBasicInfo.getOrderCode(), e); log.error("人工结算订单 推送第三方平台订单信息error, orderCode:{}", orderBasicInfo.getOrderCode(), e);
} }
}, executor); }, thirdpartyTaskExecutor);
return true; return true;
} }
@@ -1219,7 +1220,7 @@ public class OrderService {
} catch (Exception e) { } catch (Exception e) {
log.error("异步推送第三方平台启动充电逻辑 error", e); log.error("异步推送第三方平台启动充电逻辑 error", e);
} }
}, executor); }, thirdpartyTaskExecutor);
} else if (StringUtils.equals(scenarioType, ScenarioEnum.BALANCE.getValue())) { // 2-充值余额 } else if (StringUtils.equals(scenarioType, ScenarioEnum.BALANCE.getValue())) { // 2-充值余额
// 充值余额成功 // 充值余额成功
UpdateMemberBalanceDTO dto = new UpdateMemberBalanceDTO(); UpdateMemberBalanceDTO dto = new UpdateMemberBalanceDTO();

View File

@@ -49,6 +49,7 @@ import com.jsowell.common.util.id.IdUtils;
import com.jsowell.common.util.id.SnowflakeIdWorker; import com.jsowell.common.util.id.SnowflakeIdWorker;
import com.jsowell.common.util.id.UUID; import com.jsowell.common.util.id.UUID;
import com.jsowell.common.util.ip.AddressUtils; import com.jsowell.common.util.ip.AddressUtils;
import com.jsowell.framework.async.JsowellThreadFactory;
import com.jsowell.netty.handler.yunkuaichong.HeartbeatRequestHandler; import com.jsowell.netty.handler.yunkuaichong.HeartbeatRequestHandler;
import com.jsowell.netty.handler.yunkuaichong.TransactionRecordsRequestHandler; import com.jsowell.netty.handler.yunkuaichong.TransactionRecordsRequestHandler;
import com.jsowell.netty.service.camera.impl.CameraBusinessServiceImpl; import com.jsowell.netty.service.camera.impl.CameraBusinessServiceImpl;
@@ -114,6 +115,9 @@ import java.nio.charset.StandardCharsets;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ActiveProfiles("dev") @ActiveProfiles("dev")
@@ -297,6 +301,15 @@ public class SpringBootTestController {
@DubboReference @DubboReference
private JcppService jcppService; private JcppService jcppService;
ThreadFactory threadFactory = JsowellThreadFactory.forName("test-thread-factory");
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10, JsowellThreadFactory.forName("test-thread-factory"));
@Test
public void testThread() {
threadFactory.newThread(() -> System.out.println("testThread:"+ Thread.currentThread().getName()));
}
@Test @Test
public void testDubbo() { public void testDubbo() {
String s = jcppService.sayHello("jsowell-charger-web"); String s = jcppService.sayHello("jsowell-charger-web");

View File

@@ -0,0 +1,48 @@
/**
* 开源代码,仅供学习和交流研究使用,商用请联系三丙
* 微信mohan_88888
* 抖音:程序员三丙
* 付费课程知识星球https://t.zsxq.com/aKtXo
*/
package com.jsowell.framework.async;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ThreadFactory;
public class JsowellThreadFactory {
public static final String THREAD_TOPIC_SEPARATOR = " | ";
public static ThreadFactory forName(String name) {
return new ThreadFactoryBuilder()
.setNameFormat(name)
.setDaemon(true)
.setPriority(Thread.NORM_PRIORITY)
.build();
}
public static ThreadFactory forName(String name, int priority) {
return new ThreadFactoryBuilder()
.setNameFormat(name)
.setDaemon(true)
.setPriority(priority)
.build();
}
public static void updateCurrentThreadName(String threadSuffix) {
String name = Thread.currentThread().getName();
int spliteratorIndex = name.indexOf(THREAD_TOPIC_SEPARATOR);
if (spliteratorIndex > 0) {
name = name.substring(0, spliteratorIndex);
}
name = name + THREAD_TOPIC_SEPARATOR + threadSuffix;
Thread.currentThread().setName(name);
}
public static void addThreadNamePrefix(String prefix) {
String name = Thread.currentThread().getName();
name = prefix + "-" + name;
Thread.currentThread().setName(name);
}
}

View File

@@ -49,6 +49,22 @@ public class ThreadPoolConfig {
return executor; return executor;
} }
/**
* 用于处理互联互通任务的线程池
*/
@Bean(name = "thirdpartyTaskExecutor")
public ThreadPoolTaskExecutor thirdpartyTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxPoolSize/10);
executor.setCorePoolSize(corePoolSize/10);
executor.setQueueCapacity(queueCapacity/10);
executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程池对拒绝任务(无线程可用)的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// log.info("threadPoolTaskExecutor创建成功");
return executor;
}
/** /**
* 执行周期性或定时任务 * 执行周期性或定时任务
*/ */

View File

@@ -37,8 +37,8 @@ public class RemoteStartChargingRequestHandler extends AbstractYkcHandler {
@Autowired @Autowired
private CommonService commonService; private CommonService commonService;
// 引入线程池 // 引入第三方平台任务线程池
private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor");
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
@@ -126,7 +126,7 @@ public class RemoteStartChargingRequestHandler extends AbstractYkcHandler {
log.error("统一推送第三方平台订单信息error, ", e); log.error("统一推送第三方平台订单信息error, ", e);
} }
} }
}, executor); }, thirdpartyTaskExecutor);
return null; return null;
} }
} }

View File

@@ -51,8 +51,8 @@ public class TransactionRecordsRequestHandler extends AbstractYkcHandler {
private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.TRANSACTION_RECORDS_CODE.getBytes()); private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.TRANSACTION_RECORDS_CODE.getBytes());
private final String oldVersionType = YKCUtils.frameType2Str(YKCFrameTypeCode.TRANSACTION_RECORDS_OLD_VERSION_CODE.getBytes()); private final String oldVersionType = YKCUtils.frameType2Str(YKCFrameTypeCode.TRANSACTION_RECORDS_OLD_VERSION_CODE.getBytes());
// 引入线程池 // 引入第三方平台任务线程池
private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor");
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
@@ -667,23 +667,23 @@ public class TransactionRecordsRequestHandler extends AbstractYkcHandler {
OrderBasicInfo finalOrderBasicInfo = orderBasicInfo; OrderBasicInfo finalOrderBasicInfo = orderBasicInfo;
// 异步推送第三方平台订单信息 // TODO 异步推送第三方平台订单信息
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try { try {
commonService.commonPushOrderInfo(finalOrderBasicInfo); commonService.commonPushOrderInfo(finalOrderBasicInfo);
} catch (Exception e) { } catch (Exception e) {
log.error("推送第三方平台订单信息error, ", e); log.error("推送第三方平台订单信息error, ", e);
} }
}, executor); }, thirdpartyTaskExecutor);
// 异步推送第三方平台订单信息V2 // TODO 异步推送第三方平台订单信息V2
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try { try {
commonService.commonPushOrderInfoV2(finalOrderBasicInfo); commonService.commonPushOrderInfoV2(finalOrderBasicInfo);
} catch (Exception e) { } catch (Exception e) {
log.error("推送第三方平台订单信息error, ", e); log.error("推送第三方平台订单信息error, ", e);
} }
}, executor); }, thirdpartyTaskExecutor);
// 异步推送充电订单算法平台 // 异步推送充电订单算法平台
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
@@ -693,7 +693,7 @@ public class TransactionRecordsRequestHandler extends AbstractYkcHandler {
} catch (Exception e) { } catch (Exception e) {
log.error("异步推送充电订单算法平台 error, ", e); log.error("异步推送充电订单算法平台 error, ", e);
} }
}, executor); }, thirdpartyTaskExecutor);
} else { } else {
// 平台没有查到订单 // 平台没有查到订单

View File

@@ -51,6 +51,9 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler {
// 引入线程池 // 引入线程池
private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor");
// 引入第三方平台任务线程池
private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor");
@Autowired @Autowired
private PileBasicInfoService pileBasicInfoService; private PileBasicInfoService pileBasicInfoService;
@@ -352,15 +355,6 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler {
} }
} }
// 异步推送第三方平台实时数据
// CompletableFuture.runAsync(() -> {
// try {
// commonService.pushRealTimeInfo(pileSn, connectorCode, connectorStatus, realTimeMonitorData, transactionCode);
// } catch (Exception e) {
// log.error("统一推送第三方平台实时数据 error,", e);
// }
// }, executor);
// 异步推送第三方平台实时数据V2 // 异步推送第三方平台实时数据V2
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try { try {
@@ -369,7 +363,7 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler {
} catch (Exception e) { } catch (Exception e) {
log.error("统一推送第三方平台实时数据V2 error, ", e); log.error("统一推送第三方平台实时数据V2 error, ", e);
} }
}, executor); }, thirdpartyTaskExecutor);
if (StringUtils.equals(connectorStatus, Constants.ONE)) { if (StringUtils.equals(connectorStatus, Constants.ONE)) {
// 故障 // 故障
@@ -380,7 +374,7 @@ public class UploadRealTimeMonitorHandler extends AbstractYkcHandler {
} catch (Exception e) { } catch (Exception e) {
log.error("统一推送第三方平台告警信息 error, ", e); log.error("统一推送第三方平台告警信息 error, ", e);
} }
}, executor); }, thirdpartyTaskExecutor);
} }
return null; return null;

View File

@@ -73,6 +73,9 @@ public class PileRabbitListener {
// 引入线程池 // 引入线程池
private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor");
// 引入第三方平台任务线程池
private ThreadPoolTaskExecutor thirdpartyTaskExecutor = SpringUtils.getBean("thirdpartyTaskExecutor");
// ========================= 消费消息 ========================== // // ========================= 消费消息 ========================== //
// @RabbitListener(queues = "ykc.pileLogin-topic.device-group") // @RabbitListener(queues = "ykc.pileLogin-topic.device-group")
// public void testRabbitMQMessage(String msg) { // public void testRabbitMQMessage(String msg) {
@@ -378,7 +381,7 @@ public class PileRabbitListener {
} catch (Exception e) { } catch (Exception e) {
log.error("统一推送第三方平台实时数据 error,", e); log.error("统一推送第三方平台实时数据 error,", e);
} }
}, executor); }, thirdpartyTaskExecutor);
// 异步推送第三方平台实时数据V2 // 异步推送第三方平台实时数据V2
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
@@ -387,7 +390,7 @@ public class PileRabbitListener {
} catch (Exception e) { } catch (Exception e) {
log.error("统一推送第三方平台实时数据V2 error, ", e); log.error("统一推送第三方平台实时数据V2 error, ", e);
} }
}, executor); }, thirdpartyTaskExecutor);
if (StringUtils.equals(connectorStatus, Constants.ONE)) { if (StringUtils.equals(connectorStatus, Constants.ONE)) {
// 故障 // 故障
@@ -398,7 +401,7 @@ public class PileRabbitListener {
} catch (Exception e) { } catch (Exception e) {
log.error("统一推送第三方平台告警信息 error, ", e); log.error("统一推送第三方平台告警信息 error, ", e);
} }
}, executor); }, thirdpartyTaskExecutor);
} }
} }