This commit is contained in:
jsowell
2026-05-28 15:31:56 +08:00
parent 672fb3787b
commit 3c68e331fc

View File

@@ -12,7 +12,6 @@ import com.jsowell.adapay.response.QueryPaymentConfirmDetailResponse;
import com.jsowell.adapay.service.AdapayService; import com.jsowell.adapay.service.AdapayService;
import com.jsowell.common.YouDianUtils; import com.jsowell.common.YouDianUtils;
import com.jsowell.common.constant.Constants; import com.jsowell.common.constant.Constants;
import com.jsowell.common.enums.adapay.AdapayStatusEnum;
import com.jsowell.common.util.DateUtils; import com.jsowell.common.util.DateUtils;
import com.jsowell.common.util.PageUtils; import com.jsowell.common.util.PageUtils;
import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.StringUtils;
@@ -22,7 +21,6 @@ import com.jsowell.pile.dto.ApplyRefundDTO;
import com.jsowell.pile.service.AdapayUnsplitRecordService; import com.jsowell.pile.service.AdapayUnsplitRecordService;
import com.jsowell.pile.service.OrderBasicInfoService; import com.jsowell.pile.service.OrderBasicInfoService;
import com.jsowell.pile.vo.AdapayUnsplitRecordVO; import com.jsowell.pile.vo.AdapayUnsplitRecordVO;
import com.jsowell.pile.vo.web.OrderDetailInfoVO;
import com.jsowell.quartz.service.AdapayUnsplitRecordHandleService; import com.jsowell.quartz.service.AdapayUnsplitRecordHandleService;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.Cell;
@@ -51,6 +49,10 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Service @Service
@@ -59,10 +61,13 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord
private static final int IMPORT_BATCH_SIZE = 500; private static final int IMPORT_BATCH_SIZE = 500;
private static final int REFUND_WAIT_MAX_ATTEMPTS = 12; private static final int REFUND_WAIT_MAX_ATTEMPTS = 12;
private static final long REFUND_WAIT_INTERVAL_MILLIS = 5000L; private static final long REFUND_WAIT_INTERVAL_MILLIS = 5000L;
private static final int SYNC_THREAD_POOL_SIZE = 10;
private static final String HANDLE_FLAG_SUCCESS = "SUCCESS"; private static final String HANDLE_FLAG_SUCCESS = "SUCCESS";
private static final String HANDLE_FLAG_PROCESSING = "PROCESSING"; private static final String HANDLE_FLAG_PROCESSING = "PROCESSING";
private static final String HANDLE_FLAG_FAILED = "FAILED"; private static final String HANDLE_FLAG_FAILED = "FAILED";
private final ExecutorService syncExecutor = Executors.newFixedThreadPool(SYNC_THREAD_POOL_SIZE);
private final Logger log = LoggerFactory.getLogger(AdapayUnsplitRecordHandleServiceImpl.class); private final Logger log = LoggerFactory.getLogger(AdapayUnsplitRecordHandleServiceImpl.class);
@Autowired @Autowired
@@ -91,10 +96,10 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord
public void processUnsplitRecordToDefaultMember(String wechatAppId, Integer pageSize) { public void processUnsplitRecordToDefaultMember(String wechatAppId, Integer pageSize) {
int size = pageSize == null || pageSize <= 0 ? 500 : pageSize; int size = pageSize == null || pageSize <= 0 ? 500 : pageSize;
int pageNum = 1; int pageNum = 1;
int total = 0; AtomicInteger total = new AtomicInteger(0);
int success = 0; AtomicInteger success = new AtomicInteger(0);
int skipped = 0; AtomicInteger skipped = new AtomicInteger(0);
int failed = 0; AtomicInteger failed = new AtomicInteger(0);
while (true) { while (true) {
PageUtils.startPage(pageNum, size); PageUtils.startPage(pageNum, size);
@@ -104,37 +109,67 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord
} }
log.info("处理未分账数据到默认账户, pageNum:{}, pageSize:{}, 当前页:{}条", pageNum, size, list.size()); log.info("处理未分账数据到默认账户, pageNum:{}, pageSize:{}, 当前页:{}条", pageNum, size, list.size());
for (AdapayUnsplitRecordVO item : list) {
total++; List<CompletableFuture<ItemResult>> futures = list.stream()
.map(item -> CompletableFuture.supplyAsync(
() -> processOneItem(item, wechatAppId), syncExecutor))
.collect(Collectors.toList());
for (CompletableFuture<ItemResult> future : futures) {
try {
ItemResult result = future.get();
total.incrementAndGet();
switch (result) {
case SUCCESS:
success.incrementAndGet();
break;
case SKIPPED:
skipped.incrementAndGet();
break;
case FAILED:
failed.incrementAndGet();
break;
}
} catch (Exception e) {
failed.incrementAndGet();
total.incrementAndGet();
log.warn("处理未分账数据异常", e);
}
}
if (list.size() < size) {
break;
}
pageNum++;
}
log.info("处理未分账数据到默认账户结束, total:{}, success:{}, skipped:{}, failed:{}",
total.get(), success.get(), skipped.get(), failed.get());
}
private enum ItemResult {SUCCESS, SKIPPED, FAILED}
private ItemResult processOneItem(AdapayUnsplitRecordVO item, String wechatAppId) {
String paymentId = item.getPaymentId(); String paymentId = item.getPaymentId();
String orderCode = item.getOrderCode(); String orderCode = item.getOrderCode();
// queryList 中 refundAmount 映射的是 due_refund_amount即订单应退款金额。
BigDecimal dueRefundAmount = parseAmount(item.getRefundAmount()); BigDecimal dueRefundAmount = parseAmount(item.getRefundAmount());
BigDecimal waitSplitAmount = parseAmount(item.getWaitSplitAmount()); BigDecimal waitSplitAmount = parseAmount(item.getWaitSplitAmount());
if (StringUtils.isBlank(paymentId) || StringUtils.isBlank(orderCode)) { if (StringUtils.isBlank(paymentId) || StringUtils.isBlank(orderCode)) {
skipped++; return ItemResult.SKIPPED;
continue;
} }
// 有应退款金额时,必须先确认退款已足额成功;未退足会先发起差额退款并等待,未足额则本轮不分账。
if (dueRefundAmount.compareTo(BigDecimal.ZERO) > 0 && !ensureRefundBeforeSplit(item, wechatAppId)) { if (dueRefundAmount.compareTo(BigDecimal.ZERO) > 0 && !ensureRefundBeforeSplit(item, wechatAppId)) {
syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); return ItemResult.SKIPPED;
skipped++;
continue;
} }
if (waitSplitAmount.compareTo(BigDecimal.ZERO) <= 0) { if (waitSplitAmount.compareTo(BigDecimal.ZERO) <= 0) {
syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); return ItemResult.SKIPPED;
skipped++;
continue;
} }
BigDecimal confirmAmt = getLatestConfirmAmount(waitSplitAmount, item.getPayAmount(), item.getRefundAmount(), paymentId, wechatAppId); BigDecimal confirmAmt = getLatestConfirmAmount(waitSplitAmount, item.getPayAmount(), item.getRefundAmount(), paymentId, wechatAppId);
if (confirmAmt.compareTo(BigDecimal.ZERO) <= 0) { if (confirmAmt.compareTo(BigDecimal.ZERO) <= 0) {
syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); return ItemResult.SKIPPED;
skipped++;
continue;
} }
PaymentConfirmResponse response; PaymentConfirmResponse response;
@@ -153,35 +188,25 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord
.build(); .build();
response = adapayService.createPaymentConfirmRequest(param); response = adapayService.createPaymentConfirmRequest(param);
} catch (Exception e) { } catch (Exception e) {
failed++;
log.error("处理未分账数据到默认账户异常, paymentId:{}, orderCode:{}, confirmAmt:{}", log.error("处理未分账数据到默认账户异常, paymentId:{}, orderCode:{}, confirmAmt:{}",
paymentId, orderCode, confirmAmt, e); paymentId, orderCode, confirmAmt, e);
syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId);
continue; return ItemResult.FAILED;
} }
if (response != null && response.isSuccess()) { if (response != null && response.isSuccess()) {
success++;
log.info("处理未分账数据成功, paymentId:{}, orderCode:{}, 本次分账金额:{}, response:{}", log.info("处理未分账数据成功, paymentId:{}, orderCode:{}, 本次分账金额:{}, response:{}",
paymentId, orderCode, confirmAmt, JSON.toJSONString(response)); paymentId, orderCode, confirmAmt, JSON.toJSONString(response));
syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId);
return ItemResult.SUCCESS;
} else { } else {
failed++;
String errorCode = response == null ? "response_null" : response.getError_code(); String errorCode = response == null ? "response_null" : response.getError_code();
String errorMsg = response == null ? "response_is_null" : response.getError_msg(); String errorMsg = response == null ? "response_is_null" : response.getError_msg();
log.error("处理未分账数据失败, paymentId:{}, orderCode:{}, confirmAmt:{}, errorCode:{}, errorMsg:{}", log.error("处理未分账数据失败, paymentId:{}, orderCode:{}, confirmAmt:{}, errorCode:{}, errorMsg:{}",
paymentId, orderCode, confirmAmt, errorCode, errorMsg); paymentId, orderCode, confirmAmt, errorCode, errorMsg);
}
syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId); syncAndRefreshFlagsFromAdapay(paymentId, wechatAppId);
return ItemResult.FAILED;
} }
if (list.size() < size) {
break;
}
pageNum++;
}
log.info("处理未分账数据到默认账户结束, total:{}, success:{}, skipped:{}, failed:{}",
total, success, skipped, failed);
} }
@Override @Override
@@ -242,7 +267,7 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord
public int syncAndRefreshFlagsFromAdapay(String startTime, String endTime, String wechatAppId, Integer pageSize) { public int syncAndRefreshFlagsFromAdapay(String startTime, String endTime, String wechatAppId, Integer pageSize) {
int size = pageSize == null || pageSize <= 0 ? 1000 : pageSize; int size = pageSize == null || pageSize <= 0 ? 1000 : pageSize;
int pageNum = 1; int pageNum = 1;
int updatedCount = 0; AtomicInteger updatedCount = new AtomicInteger(0);
String appId = StringUtils.isBlank(wechatAppId) ? Constants.DEFAULT_APP_ID : wechatAppId; String appId = StringUtils.isBlank(wechatAppId) ? Constants.DEFAULT_APP_ID : wechatAppId;
while (true) { while (true) {
@@ -252,13 +277,20 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord
break; break;
} }
int batchUpdated = 0; List<CompletableFuture<Boolean>> futures = list.stream()
for (AdapayUnsplitRecord record : list) { .map(record -> CompletableFuture.supplyAsync(
if (doSyncAndRefresh(record, appId)) { () -> doSyncAndRefresh(record, appId), syncExecutor))
batchUpdated++; .collect(Collectors.toList());
for (CompletableFuture<Boolean> future : futures) {
try {
if (Boolean.TRUE.equals(future.get())) {
updatedCount.incrementAndGet();
}
} catch (Exception e) {
log.warn("同步刷新未分账记录异常", e);
} }
} }
updatedCount += batchUpdated;
if (list.size() < size) { if (list.size() < size) {
break; break;
@@ -266,8 +298,9 @@ public class AdapayUnsplitRecordHandleServiceImpl implements AdapayUnsplitRecord
pageNum++; pageNum++;
} }
log.info("同步刷新未分账记录完成, startTime:{}, endTime:{}, 更新:{}条", startTime, endTime, updatedCount); int total = updatedCount.get();
return updatedCount; log.info("同步刷新未分账记录完成, startTime:{}, endTime:{}, 更新:{}条", startTime, endTime, total);
return total;
} }
private boolean doSyncAndRefresh(AdapayUnsplitRecord record, String wechatAppId) { private boolean doSyncAndRefresh(AdapayUnsplitRecord record, String wechatAppId) {