diff --git a/jsowell-quartz/src/main/java/com/jsowell/quartz/service/impl/AdapayUnsplitRecordHandleServiceImpl.java b/jsowell-quartz/src/main/java/com/jsowell/quartz/service/impl/AdapayUnsplitRecordHandleServiceImpl.java index aa0540c45..ac53c8fea 100644 --- a/jsowell-quartz/src/main/java/com/jsowell/quartz/service/impl/AdapayUnsplitRecordHandleServiceImpl.java +++ b/jsowell-quartz/src/main/java/com/jsowell/quartz/service/impl/AdapayUnsplitRecordHandleServiceImpl.java @@ -12,7 +12,6 @@ import com.jsowell.adapay.response.QueryPaymentConfirmDetailResponse; import com.jsowell.adapay.service.AdapayService; import com.jsowell.common.YouDianUtils; import com.jsowell.common.constant.Constants; -import com.jsowell.common.enums.adapay.AdapayStatusEnum; import com.jsowell.common.util.DateUtils; import com.jsowell.common.util.PageUtils; import com.jsowell.common.util.StringUtils; @@ -22,7 +21,6 @@ import com.jsowell.pile.dto.ApplyRefundDTO; import com.jsowell.pile.service.AdapayUnsplitRecordService; import com.jsowell.pile.service.OrderBasicInfoService; import com.jsowell.pile.vo.AdapayUnsplitRecordVO; -import com.jsowell.pile.vo.web.OrderDetailInfoVO; import com.jsowell.quartz.service.AdapayUnsplitRecordHandleService; import org.apache.commons.collections4.CollectionUtils; import org.apache.poi.ss.usermodel.Cell; @@ -51,6 +49,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @Service @@ -59,10 +61,13 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord private static final int IMPORT_BATCH_SIZE = 500; private static final int REFUND_WAIT_MAX_ATTEMPTS = 12; private static final long REFUND_WAIT_INTERVAL_MILLIS = 5000L; + private static final int SYNC_THREAD_POOL_SIZE = 10; private static final String HANDLE_FLAG_SUCCESS = "SUCCESS"; private static final String HANDLE_FLAG_PROCESSING = "PROCESSING"; private static final String HANDLE_FLAG_FAILED = "FAILED"; + private final ExecutorService syncExecutor = Executors.newFixedThreadPool(SYNC_THREAD_POOL_SIZE); + private final Logger log = LoggerFactory.getLogger(AdapayUnsplitRecordHandleServiceImpl.class); @Autowired @@ -91,10 +96,10 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord public void processUnsplitRecordToDefaultMember(String wechatAppId, Integer pageSize) { int size = pageSize == null || pageSize <= 0 ? 500 : pageSize; int pageNum = 1; - int total = 0; - int success = 0; - int skipped = 0; - int failed = 0; + AtomicInteger total = new AtomicInteger(0); + AtomicInteger success = new AtomicInteger(0); + AtomicInteger skipped = new AtomicInteger(0); + AtomicInteger failed = new AtomicInteger(0); while (true) { PageUtils.startPage(pageNum, size); @@ -104,74 +109,32 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord } log.info("处理未分账数据到默认账户, pageNum:{}, pageSize:{}, 当前页:{}条", pageNum, size, list.size()); - for (AdapayUnsplitRecordVO item : list) { - total++; - String paymentId = item.getPaymentId(); - String orderCode = item.getOrderCode(); - // queryList 中 refundAmount 映射的是 due_refund_amount,即订单应退款金额。 - BigDecimal dueRefundAmount = parseAmount(item.getRefundAmount()); - BigDecimal waitSplitAmount = parseAmount(item.getWaitSplitAmount()); - if (StringUtils.isBlank(paymentId) || StringUtils.isBlank(orderCode)) { - skipped++; - continue; - } + List> futures = list.stream() + .map(item -> CompletableFuture.supplyAsync( + () -> processOneItem(item, wechatAppId), syncExecutor)) + .collect(Collectors.toList()); - // 有应退款金额时,必须先确认退款已足额成功;未退足会先发起差额退款并等待,未足额则本轮不分账。 - if (dueRefundAmount.compareTo(BigDecimal.ZERO) > 0 && !ensureRefundBeforeSplit(item, wechatAppId)) { - syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); - skipped++; - continue; - } - - if (waitSplitAmount.compareTo(BigDecimal.ZERO) <= 0) { - syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); - skipped++; - continue; - } - - BigDecimal confirmAmt = getLatestConfirmAmount(waitSplitAmount, item.getPayAmount(), item.getRefundAmount(), paymentId, wechatAppId); - if (confirmAmt.compareTo(BigDecimal.ZERO) <= 0) { - syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); - skipped++; - continue; - } - - PaymentConfirmResponse response; + for (CompletableFuture future : futures) { try { - DivMember divMember = new DivMember(); - divMember.setMemberId(Constants.ZERO); - divMember.setAmount(confirmAmt.setScale(2, BigDecimal.ROUND_HALF_UP).toPlainString()); - divMember.setFeeFlag(Constants.Y); - - PaymentConfirmParam param = PaymentConfirmParam.builder() - .paymentId(paymentId) - .divMemberList(Lists.newArrayList(divMember)) - .confirmAmt(confirmAmt) - .orderCode(orderCode) - .wechatAppId(wechatAppId) - .build(); - response = adapayService.createPaymentConfirmRequest(param); + ItemResult result = future.get(); + total.incrementAndGet(); + switch (result) { + case SUCCESS: + success.incrementAndGet(); + break; + case SKIPPED: + skipped.incrementAndGet(); + break; + case FAILED: + failed.incrementAndGet(); + break; + } } catch (Exception e) { - failed++; - log.error("处理未分账数据到默认账户异常, paymentId:{}, orderCode:{}, confirmAmt:{}", - paymentId, orderCode, confirmAmt, e); - syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); - continue; + failed.incrementAndGet(); + total.incrementAndGet(); + log.warn("处理未分账数据异常", e); } - - if (response != null && response.isSuccess()) { - success++; - log.info("处理未分账数据成功, paymentId:{}, orderCode:{}, 本次分账金额:{}, response:{}", - paymentId, orderCode, confirmAmt, JSON.toJSONString(response)); - } else { - failed++; - String errorCode = response == null ? "response_null" : response.getError_code(); - String errorMsg = response == null ? "response_is_null" : response.getError_msg(); - log.error("处理未分账数据失败, paymentId:{}, orderCode:{}, confirmAmt:{}, errorCode:{}, errorMsg:{}", - paymentId, orderCode, confirmAmt, errorCode, errorMsg); - } - syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); } if (list.size() < size) { @@ -181,7 +144,69 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord } log.info("处理未分账数据到默认账户结束, total:{}, success:{}, skipped:{}, failed:{}", - total, success, skipped, failed); + total.get(), success.get(), skipped.get(), failed.get()); + } + + private enum ItemResult {SUCCESS, SKIPPED, FAILED} + + private ItemResult processOneItem(AdapayUnsplitRecordVO item, String wechatAppId) { + String paymentId = item.getPaymentId(); + String orderCode = item.getOrderCode(); + BigDecimal dueRefundAmount = parseAmount(item.getRefundAmount()); + BigDecimal waitSplitAmount = parseAmount(item.getWaitSplitAmount()); + + if (StringUtils.isBlank(paymentId) || StringUtils.isBlank(orderCode)) { + return ItemResult.SKIPPED; + } + + if (dueRefundAmount.compareTo(BigDecimal.ZERO) > 0 && !ensureRefundBeforeSplit(item, wechatAppId)) { + return ItemResult.SKIPPED; + } + + if (waitSplitAmount.compareTo(BigDecimal.ZERO) <= 0) { + return ItemResult.SKIPPED; + } + + BigDecimal confirmAmt = getLatestConfirmAmount(waitSplitAmount, item.getPayAmount(), item.getRefundAmount(), paymentId, wechatAppId); + if (confirmAmt.compareTo(BigDecimal.ZERO) <= 0) { + return ItemResult.SKIPPED; + } + + PaymentConfirmResponse response; + try { + DivMember divMember = new DivMember(); + divMember.setMemberId(Constants.ZERO); + divMember.setAmount(confirmAmt.setScale(2, BigDecimal.ROUND_HALF_UP).toPlainString()); + divMember.setFeeFlag(Constants.Y); + + PaymentConfirmParam param = PaymentConfirmParam.builder() + .paymentId(paymentId) + .divMemberList(Lists.newArrayList(divMember)) + .confirmAmt(confirmAmt) + .orderCode(orderCode) + .wechatAppId(wechatAppId) + .build(); + response = adapayService.createPaymentConfirmRequest(param); + } catch (Exception e) { + log.error("处理未分账数据到默认账户异常, paymentId:{}, orderCode:{}, confirmAmt:{}", + paymentId, orderCode, confirmAmt, e); + syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); + return ItemResult.FAILED; + } + + if (response != null && response.isSuccess()) { + log.info("处理未分账数据成功, paymentId:{}, orderCode:{}, 本次分账金额:{}, response:{}", + paymentId, orderCode, confirmAmt, JSON.toJSONString(response)); + syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); + return ItemResult.SUCCESS; + } else { + String errorCode = response == null ? "response_null" : response.getError_code(); + String errorMsg = response == null ? "response_is_null" : response.getError_msg(); + log.error("处理未分账数据失败, paymentId:{}, orderCode:{}, confirmAmt:{}, errorCode:{}, errorMsg:{}", + paymentId, orderCode, confirmAmt, errorCode, errorMsg); + syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); + return ItemResult.FAILED; + } } @Override @@ -242,7 +267,7 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord public int syncAndRefreshFlagsFromAdapay(String startTime, String endTime, String wechatAppId, Integer pageSize) { int size = pageSize == null || pageSize <= 0 ? 1000 : pageSize; int pageNum = 1; - int updatedCount = 0; + AtomicInteger updatedCount = new AtomicInteger(0); String appId = StringUtils.isBlank(wechatAppId) ? Constants.DEFAULT_APP_ID : wechatAppId; while (true) { @@ -252,13 +277,20 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord break; } - int batchUpdated = 0; - for (AdapayUnsplitRecord record : list) { - if (doSyncAndRefresh(record, appId)) { - batchUpdated++; + List> futures = list.stream() + .map(record -> CompletableFuture.supplyAsync( + () -> doSyncAndRefresh(record, appId), syncExecutor)) + .collect(Collectors.toList()); + + for (CompletableFuture future : futures) { + try { + if (Boolean.TRUE.equals(future.get())) { + updatedCount.incrementAndGet(); + } + } catch (Exception e) { + log.warn("同步刷新未分账记录异常", e); } } - updatedCount += batchUpdated; if (list.size() < size) { break; @@ -266,8 +298,9 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord pageNum++; } - log.info("同步刷新未分账记录完成, startTime:{}, endTime:{}, 更新:{}条", startTime, endTime, updatedCount); - return updatedCount; + int total = updatedCount.get(); + log.info("同步刷新未分账记录完成, startTime:{}, endTime:{}, 更新:{}条", startTime, endTime, total); + return total; } private boolean doSyncAndRefresh(AdapayUnsplitRecord record, String wechatAppId) {