This commit is contained in:
Guoqs
2026-03-16 11:22:15 +08:00
parent 7c84f45e1f
commit 64d2faedfa
8 changed files with 659 additions and 0 deletions

View File

@@ -0,0 +1,567 @@
package com.jsowell.quartz.task;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.collect.Lists;
import com.jsowell.adapay.common.DivMember;
import com.jsowell.adapay.dto.PaymentConfirmParam;
import com.jsowell.adapay.response.PaymentConfirmResponse;
import com.jsowell.adapay.response.RefundResponse;
import com.jsowell.adapay.service.AdapayService;
import com.jsowell.common.YouDianUtils;
import com.jsowell.common.constant.CacheConstants;
import com.jsowell.common.constant.Constants;
import com.jsowell.common.core.redis.RedisCache;
import com.jsowell.common.util.DateUtils;
import com.jsowell.common.util.StringUtils;
import com.jsowell.common.util.http.HttpUtils;
import com.jsowell.pile.domain.AdapayUnsplitRecord;
import com.jsowell.pile.service.AdapayUnsplitRecordService;
import com.jsowell.pile.service.OrderBasicInfoService;
import com.jsowell.pile.vo.AdapayUnsplitRecordVO;
import com.jsowell.pile.vo.UnsplitOrderFieldsVO;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.poi.ss.usermodel.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* 汇付未分账订单处理 Task
*
* 三步流程(可独立调用,也可按顺序执行):
* Step1: adapayUnsplitTask.step1ImportExcel('文件路径') — 导入 Excel 到 adapay_unsplit_record
* Step2: adapayUnsplitTask.step2CompleteFields('startTime','endTime') — 调 PRE 接口补齐订单字段
* Step3: adapayUnsplitTask.step3ProcessSplit() — 分账 + 退款处理
*/
@Component("adapayUnsplitTask")
public class AdapayUnsplitTask {
private static final Logger log = LoggerFactory.getLogger(AdapayUnsplitTask.class);
/** 批量导入每批大小 */
private static final int IMPORT_BATCH_SIZE = 500;
/** 补齐字段分页大小 */
private static final int COMPLETE_PAGE_SIZE = 200;
/** 调 PRE 接口每批 orderCode 数量(避免 URL/Body 过长) */
private static final int PRE_QUERY_BATCH_SIZE = 100;
@Value("${pre.base-url:http://localhost:8080}")
private String preBaseUrl;
@Autowired
private AdapayUnsplitRecordService adapayUnsplitRecordService;
@Autowired
private AdapayService adapayService;
@Autowired
private RedisCache redisCache;
// =========================================================
// Step 1: 导入 Excel → adapay_unsplit_record
// =========================================================
/**
* 从默认路径导入 Excel
* adapayUnsplitTask.step1ImportExcel()
*/
public void step1ImportExcel() {
step1ImportExcel("doc/万车充小程序-未分账明细.xlsx");
}
/**
* 从指定路径导入 Excel 到 adapay_unsplit_record 表
* adapayUnsplitTask.step1ImportExcel('doc/万车充小程序-未分账明细.xlsx')
*/
public void step1ImportExcel(String filePath) {
Path path = Paths.get(filePath);
if (!path.isAbsolute()) {
path = Paths.get(System.getProperty("user.dir"), filePath);
}
if (!Files.exists(path)) {
log.error("[step1ImportExcel] 文件不存在: {}", path.toAbsolutePath());
return;
}
int total = 0, success = 0, skipped = 0, failed = 0;
DataFormatter formatter = new DataFormatter();
try (InputStream is = Files.newInputStream(path);
Workbook workbook = WorkbookFactory.create(is)) {
Sheet sheet = workbook.getSheetAt(0);
if (sheet == null) {
log.error("[step1ImportExcel] Excel 无 Sheet, file:{}", path.toAbsolutePath());
return;
}
Row headerRow = sheet.getRow(sheet.getFirstRowNum());
if (headerRow == null) {
log.error("[step1ImportExcel] Excel 无表头, file:{}", path.toAbsolutePath());
return;
}
Map<String, Integer> headerMap = buildHeaderMap(headerRow, formatter);
List<String> required = Arrays.asList(
"商户号", "支付时间", "交易流水号", "交易订单号",
"交易订单金额", "已确认分账金额", "已撤销金额", "支付确认撤销金额", "剩余未分账金额"
);
for (String h : required) {
if (!headerMap.containsKey(normalize(h))) {
log.error("[step1ImportExcel] 缺少必要列: {}", h);
return;
}
}
List<AdapayUnsplitRecord> batch = new ArrayList<>(IMPORT_BATCH_SIZE);
int firstData = sheet.getFirstRowNum() + 1;
int lastData = sheet.getLastRowNum();
for (int rowNum = firstData; rowNum <= lastData; rowNum++) {
Row row = sheet.getRow(rowNum);
if (row == null || isRowEmpty(row)) continue;
total++;
try {
AdapayUnsplitRecord record = rowToRecord(row, headerMap, formatter);
if (record == null) { skipped++; continue; }
batch.add(record);
if (batch.size() >= IMPORT_BATCH_SIZE) {
adapayUnsplitRecordService.batchInsertOrUpdateSelective(batch);
success += batch.size();
batch.clear();
}
} catch (Exception e) {
failed++;
log.error("[step1ImportExcel] 行解析失败, rowNum:{}", rowNum + 1, e);
}
}
if (!batch.isEmpty()) {
adapayUnsplitRecordService.batchInsertOrUpdateSelective(batch);
success += batch.size();
}
} catch (Exception e) {
log.error("[step1ImportExcel] 读取 Excel 失败, file:{}", path.toAbsolutePath(), e);
}
log.info("[step1ImportExcel] 完成, total:{}, success:{}, skipped:{}, failed:{}", total, success, skipped, failed);
}
// =========================================================
// Step 2: 调 PRE 接口补齐 order_code/pile_type/settle_amount/due_refund_amount
// =========================================================
/**
* 补齐指定时间范围内的未分账记录字段
* adapayUnsplitTask.step2CompleteFields('2024-01-01 00:00:00', '2025-12-31 23:59:59')
*/
public void step2CompleteFields(String startTime, String endTime) {
int pageNum = 1;
int updatedTotal = 0;
while (true) {
// 分页查询该时间段内的记录
com.jsowell.common.util.PageUtils.startPage(pageNum, COMPLETE_PAGE_SIZE);
List<AdapayUnsplitRecord> list = adapayUnsplitRecordService.queryUnsplitOrders(startTime, endTime);
if (CollectionUtils.isEmpty(list)) break;
// 1. 从 orderNo 中提取 orderCodeC 开头,下划线前的部分)
for (AdapayUnsplitRecord r : list) {
if (StringUtils.isBlank(r.getOrderCode())) {
r.setOrderCode(extractOrderCode(r.getOrderNo()));
}
}
// 2. 收集有效 orderCode批量调 PRE 接口查询订单字段
Set<String> orderCodes = list.stream()
.map(AdapayUnsplitRecord::getOrderCode)
.filter(c -> StringUtils.isNotBlank(c) && c.startsWith("C"))
.collect(Collectors.toSet());
Map<String, UnsplitOrderFieldsVO> orderFieldMap = new HashMap<>();
if (CollectionUtils.isNotEmpty(orderCodes)) {
orderFieldMap = queryOrderFieldsFromPre(orderCodes);
}
// 3. 逐条比对,有变化才加入更新列表
List<AdapayUnsplitRecord> updateList = new ArrayList<>();
Date now = DateUtils.getNowDate();
for (AdapayUnsplitRecord r : list) {
boolean changed = false;
// 补 orderCode
String orderCode = r.getOrderCode();
if (StringUtils.isBlank(orderCode)) {
orderCode = extractOrderCode(r.getOrderNo());
if (StringUtils.isNotBlank(orderCode)) {
r.setOrderCode(orderCode);
changed = true;
}
}
// 用 PRE 接口数据补齐其余字段
if (StringUtils.isNotBlank(orderCode)) {
UnsplitOrderFieldsVO fields = orderFieldMap.get(orderCode);
if (fields != null) {
// pile_type
String pileType = YouDianUtils.isEBikePileSn(fields.getPileSn()) ? "eBike" : "EV";
if (!StringUtils.equals(r.getPileType(), pileType)) {
r.setPileType(pileType);
changed = true;
}
// settle_amount
if (!isSameAmount(r.getSettleAmount(), fields.getSettleAmount())) {
r.setSettleAmount(fields.getSettleAmount());
changed = true;
}
// due_refund_amount
if (!isSameAmount(r.getDueRefundAmount(), fields.getRefundAmount())) {
r.setDueRefundAmount(fields.getRefundAmount());
changed = true;
}
}
}
if (changed) {
r.setUpdateTime(now);
updateList.add(r);
}
}
if (CollectionUtils.isNotEmpty(updateList)) {
adapayUnsplitRecordService.updateBatchSelective(updateList);
updatedTotal += updateList.size();
}
if (list.size() < COMPLETE_PAGE_SIZE) break;
pageNum++;
}
log.info("[step2CompleteFields] 完成, startTime:{}, endTime:{}, 更新:{}条", startTime, endTime, updatedTotal);
}
/**
* 批量调 PRE 环境 /internal/order/unsplit-fields 接口查询订单字段
* 每批最多 PRE_QUERY_BATCH_SIZE 个 orderCode并行发起请求
*/
private Map<String, UnsplitOrderFieldsVO> queryOrderFieldsFromPre(Set<String> orderCodes) {
Map<String, UnsplitOrderFieldsVO> result = new HashMap<>();
List<String> codeList = new ArrayList<>(orderCodes);
String url = preBaseUrl + "/internal/order/unsplit-fields";
// 分批请求,每批 PRE_QUERY_BATCH_SIZE 条
List<List<String>> partitions = new ArrayList<>();
for (int i = 0; i < codeList.size(); i += PRE_QUERY_BATCH_SIZE) {
partitions.add(codeList.subList(i, Math.min(i + PRE_QUERY_BATCH_SIZE, codeList.size())));
}
for (List<String> batch : partitions) {
try {
String body = JSON.toJSONString(batch);
String resp = HttpUtils.sendPostContentType(url, body, "application/json");
if (StringUtils.isBlank(resp)) continue;
JSONObject json = JSONObject.parseObject(resp);
if (json == null || json.getIntValue("code") != 200) {
log.warn("[step2CompleteFields] PRE 接口返回异常: {}", resp);
continue;
}
List<UnsplitOrderFieldsVO> list = json.getJSONArray("data")
.toJavaList(UnsplitOrderFieldsVO.class);
if (CollectionUtils.isNotEmpty(list)) {
list.forEach(v -> result.put(v.getOrderCode(), v));
}
} catch (Exception e) {
log.error("[step2CompleteFields] 调 PRE 接口失败, batch size:{}", batch.size(), e);
}
}
return result;
}
// =========================================================
// Step 3: 分账 + 退款处理
// =========================================================
/**
* 处理所有待分账记录(默认 appId10 线程并行)
* adapayUnsplitTask.step3ProcessSplit()
*/
public void step3ProcessSplit() {
step3ProcessSplit(Constants.DEFAULT_APP_ID, 10);
}
/**
* 处理所有待分账记录
* 逻辑:
* - remaining_split_amount > 0 时需要处理
* - due_refund_amount > refund_amount差额部分走退款
* - settle_amount > confirmed_split_amount差额部分走分账
* adapayUnsplitTask.step3ProcessSplit('wx_app_id', 10)
*/
public void step3ProcessSplit(String wechatAppId, Integer parallelism) {
int threads = (parallelism == null || parallelism <= 0) ? 10 : parallelism;
Boolean acquired = redisCache.setnx(CacheConstants.PROCESS_UNSPLIT_ORDERS, Constants.ONE, 120L);
if (Boolean.FALSE.equals(acquired)) {
log.info("[step3ProcessSplit] 上一批次仍在执行中,跳过");
return;
}
try {
List<AdapayUnsplitRecordVO> list = adapayUnsplitRecordService.queryList();
if (CollectionUtils.isEmpty(list)) {
log.info("[step3ProcessSplit] 无待处理记录");
return;
}
log.info("[step3ProcessSplit] 共{}条待处理记录,并行线程数:{}", list.size(), threads);
AtomicInteger splitSuccess = new AtomicInteger();
AtomicInteger splitFailed = new AtomicInteger();
AtomicInteger refundSuccess = new AtomicInteger();
AtomicInteger refundFailed = new AtomicInteger();
AtomicInteger skipped = new AtomicInteger();
ForkJoinPool pool = new ForkJoinPool(threads);
try {
pool.submit(() ->
list.parallelStream().forEach(item -> {
String paymentId = item.getPaymentId();
String orderCode = item.getOrderCode();
if (StringUtils.isBlank(paymentId)) {
skipped.incrementAndGet();
return;
}
// --- 退款部分 ---
// due_refund_amount - refund_amount = 应退但未退的金额
BigDecimal dueRefund = nvl(item.getRefundAmount()); // due_refund_amount
BigDecimal paidRefund = nvl(item.getPaidAmount()); // refund_amount已退
BigDecimal pendingRefund = dueRefund.subtract(paidRefund);
if (pendingRefund.compareTo(BigDecimal.ZERO) > 0) {
doRefund(paymentId, orderCode, pendingRefund, wechatAppId, refundSuccess, refundFailed);
}
// --- 分账部分 ---
// waitSplitAmount = settle_amount - confirmed_split_amount - payment_revoke_amount
BigDecimal waitSplit = nvl(item.getWaitSplitAmount());
if (waitSplit.compareTo(BigDecimal.ZERO) > 0) {
doSplit(item, paymentId, orderCode, waitSplit, wechatAppId, splitSuccess, splitFailed);
}
if (pendingRefund.compareTo(BigDecimal.ZERO) <= 0 && waitSplit.compareTo(BigDecimal.ZERO) <= 0) {
skipped.incrementAndGet();
}
})
).get();
} catch (Exception e) {
log.error("[step3ProcessSplit] 并行执行异常", e);
} finally {
pool.shutdown();
}
log.info("[step3ProcessSplit] 完成, splitSuccess:{}, splitFailed:{}, refundSuccess:{}, refundFailed:{}, skipped:{}",
splitSuccess.get(), splitFailed.get(), refundSuccess.get(), refundFailed.get(), skipped.get());
} finally {
redisCache.deleteObject(CacheConstants.PROCESS_UNSPLIT_ORDERS);
}
}
// =========================================================
// 私有方法
// =========================================================
private void doRefund(String paymentId, String orderCode, BigDecimal refundAmt,
String wechatAppId, AtomicInteger success, AtomicInteger failed) {
try {
RefundResponse resp = adapayService.createRefundRequest(
paymentId, refundAmt.setScale(2, BigDecimal.ROUND_HALF_UP),
wechatAppId, Constants.ZERO, "SPLIT", orderCode);
if (resp != null && resp.isSuccess()) {
success.incrementAndGet();
updateSplitResult(paymentId, "REFUND_SUCCESS",
"退款金额: " + refundAmt.toPlainString());
log.info("[step3ProcessSplit] 退款成功, paymentId:{}, orderCode:{}, refundAmt:{}", paymentId, orderCode, refundAmt);
} else {
failed.incrementAndGet();
String errCode = resp == null ? "null" : resp.getError_code();
String errMsg = resp == null ? "response_is_null" : resp.getError_msg();
updateSplitResult(paymentId, "REFUND_FAILED", errCode + ": " + errMsg);
log.error("[step3ProcessSplit] 退款失败, paymentId:{}, orderCode:{}, errCode:{}, errMsg:{}", paymentId, orderCode, errCode, errMsg);
}
} catch (Exception e) {
failed.incrementAndGet();
updateSplitResult(paymentId, "REFUND_FAILED", "异常: " + e.getMessage());
log.error("[step3ProcessSplit] 退款异常, paymentId:{}, orderCode:{}", paymentId, orderCode, e);
}
}
private void doSplit(AdapayUnsplitRecordVO item, String paymentId, String orderCode,
BigDecimal waitSplit, String wechatAppId,
AtomicInteger success, AtomicInteger failed) {
try {
DivMember divMember = new DivMember();
divMember.setMemberId(Constants.ZERO);
divMember.setAmount(waitSplit.setScale(2, BigDecimal.ROUND_HALF_UP).toPlainString());
divMember.setFeeFlag(Constants.Y);
PaymentConfirmParam param = PaymentConfirmParam.builder()
.paymentId(paymentId)
.divMemberList(Lists.newArrayList(divMember))
.confirmAmt(waitSplit)
.orderCode(orderCode)
.wechatAppId(wechatAppId)
.build();
PaymentConfirmResponse resp = adapayService.createPaymentConfirmRequest(param);
if (resp != null && resp.isSuccess()) {
success.incrementAndGet();
// 累加 confirmed_split_amount
BigDecimal newConfirmed = nvl(item.getConfirmedSplitAmount())
.add(waitSplit).setScale(2, BigDecimal.ROUND_HALF_UP);
AdapayUnsplitRecord update = new AdapayUnsplitRecord();
update.setPaymentId(paymentId);
update.setConfirmedSplitAmount(newConfirmed);
update.setSplitFlag("SUCCESS");
update.setSplitRemark("分账金额: " + waitSplit.toPlainString());
update.setUpdateTime(DateUtils.getNowDate());
adapayUnsplitRecordService.updateByPaymentIdSelective(update);
log.info("[step3ProcessSplit] 分账成功, paymentId:{}, orderCode:{}, amt:{}", paymentId, orderCode, waitSplit);
} else {
failed.incrementAndGet();
String errCode = resp == null ? "null" : resp.getError_code();
String errMsg = resp == null ? "response_is_null" : resp.getError_msg();
updateSplitResult(paymentId, "FAILED", errCode + ": " + errMsg);
log.error("[step3ProcessSplit] 分账失败, paymentId:{}, orderCode:{}, errCode:{}, errMsg:{}", paymentId, orderCode, errCode, errMsg);
}
} catch (Exception e) {
failed.incrementAndGet();
updateSplitResult(paymentId, "FAILED", "异常: " + e.getMessage());
log.error("[step3ProcessSplit] 分账异常, paymentId:{}, orderCode:{}", paymentId, orderCode, e);
}
}
private void updateSplitResult(String paymentId, String splitFlag, String splitRemark) {
AdapayUnsplitRecord r = new AdapayUnsplitRecord();
r.setPaymentId(paymentId);
r.setSplitFlag(splitFlag);
r.setSplitRemark(splitRemark);
r.setUpdateTime(DateUtils.getNowDate());
adapayUnsplitRecordService.updateByPaymentIdSelective(r);
}
private AdapayUnsplitRecord rowToRecord(Row row, Map<String, Integer> headerMap, DataFormatter formatter) {
String paymentId = cellStr(row, headerMap.get(normalize("交易流水号")), formatter);
if (StringUtils.isBlank(paymentId)) return null;
String orderNo = cellStr(row, headerMap.get(normalize("交易订单号")), formatter);
String orderCode = extractOrderCode(orderNo);
AdapayUnsplitRecord r = new AdapayUnsplitRecord();
r.setMerchantCode(cellStr(row, headerMap.get(normalize("商户号")), formatter));
r.setPayTime(parsePayTime(getCell(row, headerMap.get(normalize("支付时间"))), formatter));
r.setPaymentId(paymentId);
r.setOrderNo(orderNo);
r.setOrderCode(orderCode);
r.setPayAmount(cellDecimal(row, headerMap.get(normalize("交易订单金额")), formatter));
r.setConfirmedSplitAmount(cellDecimal(row, headerMap.get(normalize("已确认分账金额")), formatter));
r.setRefundAmount(cellDecimal(row, headerMap.get(normalize("已撤销金额")), formatter));
r.setPaymentRevokeAmount(cellDecimal(row, headerMap.get(normalize("支付确认撤销金额")), formatter));
r.setRemainingSplitAmount(cellDecimal(row, headerMap.get(normalize("剩余未分账金额")), formatter));
r.setUpdateTime(DateUtils.getNowDate());
return r;
}
private String extractOrderCode(String orderNo) {
if (StringUtils.isBlank(orderNo)) return null;
int idx = orderNo.indexOf("_");
String code = idx > 0 ? orderNo.substring(0, idx) : orderNo;
return code.length() <= 16 ? code : null;
}
private Map<String, Integer> buildHeaderMap(Row headerRow, DataFormatter formatter) {
Map<String, Integer> map = new HashMap<>();
for (int i = headerRow.getFirstCellNum(); i < headerRow.getLastCellNum(); i++) {
Cell cell = headerRow.getCell(i, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
if (cell == null) continue;
String h = normalize(formatter.formatCellValue(cell));
if (StringUtils.isNotBlank(h)) map.put(h, i);
}
return map;
}
private String cellStr(Row row, Integer col, DataFormatter formatter) {
Cell cell = getCell(row, col);
if (cell == null) return null;
String v = formatter.formatCellValue(cell);
return StringUtils.isBlank(v) ? null : v.trim();
}
private Cell getCell(Row row, Integer col) {
if (row == null || col == null) return null;
return row.getCell(col, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
}
private BigDecimal cellDecimal(Row row, Integer col, DataFormatter formatter) {
String v = cellStr(row, col, formatter);
if (StringUtils.isBlank(v)) return BigDecimal.ZERO;
try { return new BigDecimal(v.replace(",", "").trim()); }
catch (NumberFormatException e) { return BigDecimal.ZERO; }
}
private Date parsePayTime(Cell cell, DataFormatter formatter) {
if (cell == null) return null;
if (cell.getCellType() == CellType.NUMERIC) return DateUtil.getJavaDate(cell.getNumericCellValue());
if (cell.getCellType() == CellType.FORMULA
&& cell.getCachedFormulaResultType() == CellType.NUMERIC)
return DateUtil.getJavaDate(cell.getNumericCellValue());
String v = formatter.formatCellValue(cell).trim();
if (StringUtils.isBlank(v)) return null;
Date d = DateUtils.parseDate(v);
if (d != null) return d;
try { return DateUtil.getJavaDate(Double.parseDouble(v)); }
catch (Exception e) { return null; }
}
private boolean isRowEmpty(Row row) {
for (int i = row.getFirstCellNum(); i < row.getLastCellNum(); i++) {
Cell c = row.getCell(i, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
if (c != null && c.getCellType() != CellType.BLANK) return false;
}
return true;
}
private String normalize(String s) {
return s == null ? "" : s.replace(" ", "").trim();
}
private boolean isSameAmount(BigDecimal a, BigDecimal b) {
if (a == null && b == null) return true;
if (a == null || b == null) return false;
return a.compareTo(b) == 0;
}
private BigDecimal nvl(BigDecimal v) {
return v == null ? BigDecimal.ZERO : v;
}
private BigDecimal nvl(String v) {
if (StringUtils.isBlank(v)) return BigDecimal.ZERO;
try { return new BigDecimal(v.replace(",", "").trim()); }
catch (Exception e) { return BigDecimal.ZERO; }
}
}