From a07cef51ea5358576d9397fe413a5af22358f3d9 Mon Sep 17 00:00:00 2001 From: Guoqs <123456@jsowell.com> Date: Fri, 13 Mar 2026 14:41:48 +0800 Subject: [PATCH] update task --- .../com/jsowell/quartz/task/JsowellTask.java | 212 ++++++++---------- 1 file changed, 98 insertions(+), 114 deletions(-) diff --git a/jsowell-quartz/src/main/java/com/jsowell/quartz/task/JsowellTask.java b/jsowell-quartz/src/main/java/com/jsowell/quartz/task/JsowellTask.java index 7042ecdaa..f68047f06 100644 --- a/jsowell-quartz/src/main/java/com/jsowell/quartz/task/JsowellTask.java +++ b/jsowell-quartz/src/main/java/com/jsowell/quartz/task/JsowellTask.java @@ -542,127 +542,123 @@ public class JsowellTask { } /** - * 批量处理 adapay_unsplit_record 未分账记录,向汇付发起分账请求(无参入口,使用默认参数) - * 默认:appId=DEFAULT_APP_ID,每页500条,每次请求间隔60秒 + * 批量处理 adapay_unsplit_record 未分账记录,向汇付发起分账请求(无参入口) + * 默认:appId=DEFAULT_APP_ID,并行线程数=10 * jsowellTask.processUnsplitRecordBatch() */ public void processUnsplitRecordBatch() { - processUnsplitRecordBatch(Constants.DEFAULT_APP_ID, 500, 60_000L); + processUnsplitRecordBatch(Constants.DEFAULT_APP_ID, 10); } /** - * 批量处理 adapay_unsplit_record 未分账记录,向汇付发起分账请求 + * 批量处理 adapay_unsplit_record 未分账记录,向汇付发起分账请求(并行版本) * 流程: - * 1. Redis setnx 防并发,锁有效期 = requestIntervalMs/1000 + 65 秒 - * 2. 分页查询待分账记录(queryList),每页 pageSize 条 - * 3. 每条记录先查询汇付最新可分账金额,再发起分账确认请求 - * 4. 每次分账请求后 sleep requestIntervalMs 毫秒,满足汇付速率限制 - * 5. 成功则更新 confirmed_split_amount 和 split_flag=SUCCESS,失败标记 FAILED - * jsowellTask.processUnsplitRecordBatch(wechatAppId, pageSize, requestIntervalMs) - * 示例:jsowellTask.processUnsplitRecordBatch('wx_app_id', 500, 60000) + * 1. Redis setnx 防并发重入,锁有效期 120 秒 + * 2. 全量查询所有待分账记录(避免 ThreadLocal 分页在并行流中失效) + * 3. 使用自定义 ForkJoinPool 并行发起分账请求,并发度由 parallelism 控制 + * 4. 成功则更新 confirmed_split_amount 和 split_flag=SUCCESS,失败标记 FAILED + * jsowellTask.processUnsplitRecordBatch(wechatAppId, parallelism) + * 示例:jsowellTask.processUnsplitRecordBatch('wx_app_id', 10) */ - public void processUnsplitRecordBatch(String wechatAppId, Integer pageSize, Long requestIntervalMs) { - int size = (pageSize == null || pageSize <= 0) ? 500 : pageSize; - long intervalMs = (requestIntervalMs == null || requestIntervalMs < 0) ? 60_000L : requestIntervalMs; - // 锁有效期略大于间隔,防止任务还在跑时锁已过期被重入 - long lockTtlSeconds = intervalMs / 1000 + 65; + public void processUnsplitRecordBatch(String wechatAppId, Integer parallelism) { + int threads = (parallelism == null || parallelism <= 0) ? 10 : parallelism; - // Redis 分布式锁,防止定时任务并发执行 - Boolean acquired = redisCache.setnx(CacheConstants.PROCESS_UNSPLIT_ORDERS, Constants.ONE, lockTtlSeconds); + // Redis 分布式锁,防止并发执行 + Boolean acquired = redisCache.setnx(CacheConstants.PROCESS_UNSPLIT_ORDERS, Constants.ONE, 120L); if (Boolean.FALSE.equals(acquired)) { log.info("[processUnsplitRecordBatch] 上一批次仍在执行中,跳过本次调度"); return; } - int total = 0, success = 0, skipped = 0, failed = 0; - int pageNum = 1; - try { - while (true) { - PageUtils.startPage(pageNum, size); - List list = adapayUnsplitRecordService.queryList(); - if (CollectionUtils.isEmpty(list)) { - break; - } - log.info("[processUnsplitRecordBatch] pageNum:{}, 当前页:{}条", pageNum, list.size()); - - for (AdapayUnsplitRecordVO item : list) { - total++; - String paymentId = item.getPaymentId(); - String orderCode = item.getOrderCode(); - BigDecimal waitSplitAmount = parseAmount(item.getWaitSplitAmount()); - - if (StringUtils.isBlank(paymentId) || waitSplitAmount.compareTo(BigDecimal.ZERO) <= 0) { - skipped++; - continue; - } - - // 查询汇付最新可分账金额,以实际剩余可分账金额为准 - BigDecimal confirmAmt = getLatestConfirmAmount(waitSplitAmount, item.getPayAmount(), paymentId, wechatAppId); - if (confirmAmt.compareTo(BigDecimal.ZERO) <= 0) { - skipped++; - log.info("[processUnsplitRecordBatch] 可分账金额为0,跳过, paymentId:{}, orderCode:{}", paymentId, orderCode); - continue; - } - - // 发起汇付分账请求 - PaymentConfirmResponse response; - try { - DivMember divMember = new DivMember(); - divMember.setMemberId(Constants.ZERO); - divMember.setAmount(confirmAmt.setScale(2, BigDecimal.ROUND_HALF_UP).toPlainString()); - divMember.setFeeFlag(Constants.Y); - - PaymentConfirmParam param = PaymentConfirmParam.builder() - .paymentId(paymentId) - .divMemberList(Lists.newArrayList(divMember)) - .confirmAmt(confirmAmt) - .orderCode(orderCode) - .wechatAppId(wechatAppId) - .build(); - response = adapayService.createPaymentConfirmRequest(param); - } catch (Exception e) { - failed++; - log.error("[processUnsplitRecordBatch] 发起分账请求异常, paymentId:{}, orderCode:{}, confirmAmt:{}", - paymentId, orderCode, confirmAmt, e); - markSplitResult(paymentId, "FAILED"); - // 请求异常也需要等待,避免持续冲击汇付接口 - sleepForRateLimit(intervalMs); - continue; - } - - if (response != null && response.isSuccess()) { - success++; - updateConfirmedSplitAmount(item, confirmAmt, paymentId); - markSplitResult(paymentId, "SUCCESS"); - log.info("[processUnsplitRecordBatch] 分账成功, paymentId:{}, orderCode:{}, confirmAmt:{}", - paymentId, orderCode, confirmAmt); - } else { - failed++; - String errorCode = response == null ? "response_null" : response.getError_code(); - String errorMsg = response == null ? "response_is_null" : response.getError_msg(); - log.error("[processUnsplitRecordBatch] 分账失败, paymentId:{}, orderCode:{}, confirmAmt:{}, errorCode:{}, errorMsg:{}", - paymentId, orderCode, confirmAmt, errorCode, errorMsg); - markSplitResult(paymentId, "FAILED"); - } - - // 每次请求后按配置间隔等待,满足汇付速率限制(默认60秒/次) - sleepForRateLimit(intervalMs); - // 每次请求后续期锁,防止长时间运行时锁过期 - redisCache.setnx(CacheConstants.PROCESS_UNSPLIT_ORDERS, Constants.ONE, lockTtlSeconds); - } - - if (list.size() < size) { - break; - } - pageNum++; + // 全量查询待分账记录(queryList 已过滤 waitSplitAmount<=0 的记录) + // 注意:不使用 PageUtils.startPage,因为 ThreadLocal 分页在并行流中会失效 + List list = adapayUnsplitRecordService.queryList(); + if (CollectionUtils.isEmpty(list)) { + log.info("[processUnsplitRecordBatch] 无待分账记录,直接返回"); + return; } + log.info("[processUnsplitRecordBatch] 共查询到{}条待分账记录,并行线程数:{}", list.size(), threads); + + java.util.concurrent.atomic.AtomicInteger total = new java.util.concurrent.atomic.AtomicInteger(); + java.util.concurrent.atomic.AtomicInteger success = new java.util.concurrent.atomic.AtomicInteger(); + java.util.concurrent.atomic.AtomicInteger skipped = new java.util.concurrent.atomic.AtomicInteger(); + java.util.concurrent.atomic.AtomicInteger failed = new java.util.concurrent.atomic.AtomicInteger(); + + // 自定义 ForkJoinPool 控制并发度,避免占满公共池影响其他业务 + java.util.concurrent.ForkJoinPool forkJoinPool = new java.util.concurrent.ForkJoinPool(threads); + try { + forkJoinPool.submit(() -> + list.parallelStream().forEach(item -> { + total.incrementAndGet(); + String paymentId = item.getPaymentId(); + String orderCode = item.getOrderCode(); + BigDecimal waitSplitAmount = parseAmount(item.getWaitSplitAmount()); + + if (StringUtils.isBlank(paymentId) || waitSplitAmount.compareTo(BigDecimal.ZERO) <= 0) { + skipped.incrementAndGet(); + return; + } + + // 查询汇付最新可分账金额,以实际剩余可分账金额为准 + BigDecimal confirmAmt = getLatestConfirmAmount(waitSplitAmount, item.getPayAmount(), paymentId, wechatAppId); + if (confirmAmt.compareTo(BigDecimal.ZERO) <= 0) { + skipped.incrementAndGet(); + log.info("[processUnsplitRecordBatch] 可分账金额为0,跳过, paymentId:{}, orderCode:{}", paymentId, orderCode); + return; + } + + // 发起汇付分账请求 + PaymentConfirmResponse response; + try { + DivMember divMember = new DivMember(); + divMember.setMemberId(Constants.ZERO); + divMember.setAmount(confirmAmt.setScale(2, BigDecimal.ROUND_HALF_UP).toPlainString()); + divMember.setFeeFlag(Constants.Y); + + PaymentConfirmParam param = PaymentConfirmParam.builder() + .paymentId(paymentId) + .divMemberList(Lists.newArrayList(divMember)) + .confirmAmt(confirmAmt) + .orderCode(orderCode) + .wechatAppId(wechatAppId) + .build(); + response = adapayService.createPaymentConfirmRequest(param); + } catch (Exception e) { + failed.incrementAndGet(); + log.error("[processUnsplitRecordBatch] 发起分账请求异常, paymentId:{}, orderCode:{}, confirmAmt:{}", + paymentId, orderCode, confirmAmt, e); + markSplitResult(paymentId, "FAILED"); + return; + } + + if (response != null && response.isSuccess()) { + success.incrementAndGet(); + updateConfirmedSplitAmount(item, confirmAmt, paymentId); + markSplitResult(paymentId, "SUCCESS"); + log.info("[processUnsplitRecordBatch] 分账成功, paymentId:{}, orderCode:{}, confirmAmt:{}", + paymentId, orderCode, confirmAmt); + } else { + failed.incrementAndGet(); + String errorCode = response == null ? "response_null" : response.getError_code(); + String errorMsg = response == null ? "response_is_null" : response.getError_msg(); + log.error("[processUnsplitRecordBatch] 分账失败, paymentId:{}, orderCode:{}, confirmAmt:{}, errorCode:{}, errorMsg:{}", + paymentId, orderCode, confirmAmt, errorCode, errorMsg); + markSplitResult(paymentId, "FAILED"); + } + }) + ).get(); + } catch (Exception e) { + log.error("[processUnsplitRecordBatch] 并行执行异常", e); + } finally { + forkJoinPool.shutdown(); + } + + log.info("[processUnsplitRecordBatch] 执行结束, total:{}, success:{}, skipped:{}, failed:{}", + total.get(), success.get(), skipped.get(), failed.get()); } finally { redisCache.deleteObject(CacheConstants.PROCESS_UNSPLIT_ORDERS); } - - log.info("[processUnsplitRecordBatch] 执行结束, total:{}, success:{}, skipped:{}, failed:{}", - total, success, skipped, failed); } /** @@ -676,18 +672,6 @@ public class JsowellTask { log.info("补齐未分账数据缺失字段完成, startTime:{}, endTime:{}, 更新:{}条", startTime, endTime, updatedCount); } - private void sleepForRateLimit(long intervalMs) { - if (intervalMs <= 0) { - return; - } - try { - Thread.sleep(intervalMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("[processUnsplitRecordBatch] sleep 被中断"); - } - } - /** * 从Excel导入adapay_unsplit_record,并补齐缺失字段 * 流程: