update task

This commit is contained in:
Guoqs
2026-03-13 14:41:48 +08:00
parent 5a7a546ef9
commit a07cef51ea

View File

@@ -542,67 +542,70 @@ public class JsowellTask {
} }
/** /**
* 批量处理 adapay_unsplit_record 未分账记录,向汇付发起分账请求(无参入口,使用默认参数 * 批量处理 adapay_unsplit_record 未分账记录,向汇付发起分账请求(无参入口)
* 默认appId=DEFAULT_APP_ID每页500条每次请求间隔60秒 * 默认appId=DEFAULT_APP_ID并行线程数=10
* jsowellTask.processUnsplitRecordBatch() * jsowellTask.processUnsplitRecordBatch()
*/ */
public void processUnsplitRecordBatch() { public void processUnsplitRecordBatch() {
processUnsplitRecordBatch(Constants.DEFAULT_APP_ID, 500, 60_000L); processUnsplitRecordBatch(Constants.DEFAULT_APP_ID, 10);
} }
/** /**
* 批量处理 adapay_unsplit_record 未分账记录,向汇付发起分账请求 * 批量处理 adapay_unsplit_record 未分账记录,向汇付发起分账请求(并行版本)
* 流程: * 流程:
* 1. Redis setnx 防并发,锁有效期 = requestIntervalMs/1000 + 65 * 1. Redis setnx 防并发重入,锁有效期 120
* 2. 分页查询待分账记录queryList每页 pageSize 条 * 2. 全量查询所有待分账记录(避免 ThreadLocal 分页在并行流中失效)
* 3. 每条记录先查询汇付最新可分账金额,再发起分账确认请求 * 3. 使用自定义 ForkJoinPool 并行发起分账请求,并发度由 parallelism 控制
* 4. 每次分账请求后 sleep requestIntervalMs 毫秒,满足汇付速率限制 * 4. 成功则更新 confirmed_split_amount 和 split_flag=SUCCESS失败标记 FAILED
* 5. 成功则更新 confirmed_split_amount 和 split_flag=SUCCESS失败标记 FAILED * jsowellTask.processUnsplitRecordBatch(wechatAppId, parallelism)
* jsowellTask.processUnsplitRecordBatch(wechatAppId, pageSize, requestIntervalMs) * 示例:jsowellTask.processUnsplitRecordBatch('wx_app_id', 10)
* 示例jsowellTask.processUnsplitRecordBatch('wx_app_id', 500, 60000)
*/ */
public void processUnsplitRecordBatch(String wechatAppId, Integer pageSize, Long requestIntervalMs) { public void processUnsplitRecordBatch(String wechatAppId, Integer parallelism) {
int size = (pageSize == null || pageSize <= 0) ? 500 : pageSize; int threads = (parallelism == null || parallelism <= 0) ? 10 : parallelism;
long intervalMs = (requestIntervalMs == null || requestIntervalMs < 0) ? 60_000L : requestIntervalMs;
// 锁有效期略大于间隔,防止任务还在跑时锁已过期被重入
long lockTtlSeconds = intervalMs / 1000 + 65;
// Redis 分布式锁,防止定时任务并发执行 // Redis 分布式锁,防止并发执行
Boolean acquired = redisCache.setnx(CacheConstants.PROCESS_UNSPLIT_ORDERS, Constants.ONE, lockTtlSeconds); Boolean acquired = redisCache.setnx(CacheConstants.PROCESS_UNSPLIT_ORDERS, Constants.ONE, 120L);
if (Boolean.FALSE.equals(acquired)) { if (Boolean.FALSE.equals(acquired)) {
log.info("[processUnsplitRecordBatch] 上一批次仍在执行中,跳过本次调度"); log.info("[processUnsplitRecordBatch] 上一批次仍在执行中,跳过本次调度");
return; return;
} }
int total = 0, success = 0, skipped = 0, failed = 0;
int pageNum = 1;
try { try {
while (true) { // 全量查询待分账记录queryList 已过滤 waitSplitAmount<=0 的记录)
PageUtils.startPage(pageNum, size); // 注意:不使用 PageUtils.startPage,因为 ThreadLocal 分页在并行流中会失效
List<AdapayUnsplitRecordVO> list = adapayUnsplitRecordService.queryList(); List<AdapayUnsplitRecordVO> list = adapayUnsplitRecordService.queryList();
if (CollectionUtils.isEmpty(list)) { if (CollectionUtils.isEmpty(list)) {
break; log.info("[processUnsplitRecordBatch] 无待分账记录,直接返回");
return;
} }
log.info("[processUnsplitRecordBatch] pageNum:{}, 当前页:{}", pageNum, list.size()); log.info("[processUnsplitRecordBatch] 共查询到{}条待分账记录,并行线程数:{}", list.size(), threads);
for (AdapayUnsplitRecordVO item : list) { java.util.concurrent.atomic.AtomicInteger total = new java.util.concurrent.atomic.AtomicInteger();
total++; java.util.concurrent.atomic.AtomicInteger success = new java.util.concurrent.atomic.AtomicInteger();
java.util.concurrent.atomic.AtomicInteger skipped = new java.util.concurrent.atomic.AtomicInteger();
java.util.concurrent.atomic.AtomicInteger failed = new java.util.concurrent.atomic.AtomicInteger();
// 自定义 ForkJoinPool 控制并发度,避免占满公共池影响其他业务
java.util.concurrent.ForkJoinPool forkJoinPool = new java.util.concurrent.ForkJoinPool(threads);
try {
forkJoinPool.submit(() ->
list.parallelStream().forEach(item -> {
total.incrementAndGet();
String paymentId = item.getPaymentId(); String paymentId = item.getPaymentId();
String orderCode = item.getOrderCode(); String orderCode = item.getOrderCode();
BigDecimal waitSplitAmount = parseAmount(item.getWaitSplitAmount()); BigDecimal waitSplitAmount = parseAmount(item.getWaitSplitAmount());
if (StringUtils.isBlank(paymentId) || waitSplitAmount.compareTo(BigDecimal.ZERO) <= 0) { if (StringUtils.isBlank(paymentId) || waitSplitAmount.compareTo(BigDecimal.ZERO) <= 0) {
skipped++; skipped.incrementAndGet();
continue; return;
} }
// 查询汇付最新可分账金额,以实际剩余可分账金额为准 // 查询汇付最新可分账金额,以实际剩余可分账金额为准
BigDecimal confirmAmt = getLatestConfirmAmount(waitSplitAmount, item.getPayAmount(), paymentId, wechatAppId); BigDecimal confirmAmt = getLatestConfirmAmount(waitSplitAmount, item.getPayAmount(), paymentId, wechatAppId);
if (confirmAmt.compareTo(BigDecimal.ZERO) <= 0) { if (confirmAmt.compareTo(BigDecimal.ZERO) <= 0) {
skipped++; skipped.incrementAndGet();
log.info("[processUnsplitRecordBatch] 可分账金额为0跳过, paymentId:{}, orderCode:{}", paymentId, orderCode); log.info("[processUnsplitRecordBatch] 可分账金额为0跳过, paymentId:{}, orderCode:{}", paymentId, orderCode);
continue; return;
} }
// 发起汇付分账请求 // 发起汇付分账请求
@@ -622,47 +625,40 @@ public class JsowellTask {
.build(); .build();
response = adapayService.createPaymentConfirmRequest(param); response = adapayService.createPaymentConfirmRequest(param);
} catch (Exception e) { } catch (Exception e) {
failed++; failed.incrementAndGet();
log.error("[processUnsplitRecordBatch] 发起分账请求异常, paymentId:{}, orderCode:{}, confirmAmt:{}", log.error("[processUnsplitRecordBatch] 发起分账请求异常, paymentId:{}, orderCode:{}, confirmAmt:{}",
paymentId, orderCode, confirmAmt, e); paymentId, orderCode, confirmAmt, e);
markSplitResult(paymentId, "FAILED"); markSplitResult(paymentId, "FAILED");
// 请求异常也需要等待,避免持续冲击汇付接口 return;
sleepForRateLimit(intervalMs);
continue;
} }
if (response != null && response.isSuccess()) { if (response != null && response.isSuccess()) {
success++; success.incrementAndGet();
updateConfirmedSplitAmount(item, confirmAmt, paymentId); updateConfirmedSplitAmount(item, confirmAmt, paymentId);
markSplitResult(paymentId, "SUCCESS"); markSplitResult(paymentId, "SUCCESS");
log.info("[processUnsplitRecordBatch] 分账成功, paymentId:{}, orderCode:{}, confirmAmt:{}", log.info("[processUnsplitRecordBatch] 分账成功, paymentId:{}, orderCode:{}, confirmAmt:{}",
paymentId, orderCode, confirmAmt); paymentId, orderCode, confirmAmt);
} else { } else {
failed++; failed.incrementAndGet();
String errorCode = response == null ? "response_null" : response.getError_code(); String errorCode = response == null ? "response_null" : response.getError_code();
String errorMsg = response == null ? "response_is_null" : response.getError_msg(); String errorMsg = response == null ? "response_is_null" : response.getError_msg();
log.error("[processUnsplitRecordBatch] 分账失败, paymentId:{}, orderCode:{}, confirmAmt:{}, errorCode:{}, errorMsg:{}", log.error("[processUnsplitRecordBatch] 分账失败, paymentId:{}, orderCode:{}, confirmAmt:{}, errorCode:{}, errorMsg:{}",
paymentId, orderCode, confirmAmt, errorCode, errorMsg); paymentId, orderCode, confirmAmt, errorCode, errorMsg);
markSplitResult(paymentId, "FAILED"); markSplitResult(paymentId, "FAILED");
} }
})
// 每次请求后按配置间隔等待满足汇付速率限制默认60秒/次) ).get();
sleepForRateLimit(intervalMs); } catch (Exception e) {
// 每次请求后续期锁,防止长时间运行时锁过期 log.error("[processUnsplitRecordBatch] 并行执行异常", e);
redisCache.setnx(CacheConstants.PROCESS_UNSPLIT_ORDERS, Constants.ONE, lockTtlSeconds);
}
if (list.size() < size) {
break;
}
pageNum++;
}
} finally { } finally {
redisCache.deleteObject(CacheConstants.PROCESS_UNSPLIT_ORDERS); forkJoinPool.shutdown();
} }
log.info("[processUnsplitRecordBatch] 执行结束, total:{}, success:{}, skipped:{}, failed:{}", log.info("[processUnsplitRecordBatch] 执行结束, total:{}, success:{}, skipped:{}, failed:{}",
total, success, skipped, failed); total.get(), success.get(), skipped.get(), failed.get());
} finally {
redisCache.deleteObject(CacheConstants.PROCESS_UNSPLIT_ORDERS);
}
} }
/** /**
@@ -676,18 +672,6 @@ public class JsowellTask {
log.info("补齐未分账数据缺失字段完成, startTime:{}, endTime:{}, 更新:{}条", startTime, endTime, updatedCount); log.info("补齐未分账数据缺失字段完成, startTime:{}, endTime:{}, 更新:{}条", startTime, endTime, updatedCount);
} }
private void sleepForRateLimit(long intervalMs) {
if (intervalMs <= 0) {
return;
}
try {
Thread.sleep(intervalMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[processUnsplitRecordBatch] sleep 被中断");
}
}
/** /**
* 从Excel导入adapay_unsplit_record并补齐缺失字段 * 从Excel导入adapay_unsplit_record并补齐缺失字段
* 流程: * 流程: