diff --git a/jsowell-admin/src/test/java/PaymentTestController.java b/jsowell-admin/src/test/java/PaymentTestController.java index 54ba3e22a..2952b781e 100644 --- a/jsowell-admin/src/test/java/PaymentTestController.java +++ b/jsowell-admin/src/test/java/PaymentTestController.java @@ -4,6 +4,8 @@ import com.alibaba.fastjson2.JSONObject; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.huifu.adapay.core.exception.BaseAdaPayException; +import com.huifu.adapay.model.CorpMember; +import com.huifu.adapay.model.Member; import com.huifu.adapay.model.PaymentReverse; import com.huifu.adapay.model.Refund; import com.jsowell.JsowellApplication; @@ -16,6 +18,7 @@ import com.jsowell.adapay.response.PaymentConfirmResponse; import com.jsowell.adapay.response.PaymentReverseResponse; import com.jsowell.adapay.response.QueryPaymentConfirmDetailResponse; import com.jsowell.adapay.service.AdapayService; +import com.jsowell.adapay.vo.AdapayCorpMemberVO; import com.jsowell.adapay.vo.OrderSplitResult; import com.jsowell.common.constant.CacheConstants; import com.jsowell.common.constant.Constants; @@ -28,6 +31,7 @@ import com.jsowell.pile.domain.AdapayMemberAccount; import com.jsowell.pile.domain.OrderUnsplitRecord; import com.jsowell.pile.service.OrderBasicInfoService; import com.jsowell.pile.service.OrderUnsplitRecordService; +import com.jsowell.pile.service.PileMerchantInfoService; import org.apache.commons.collections4.CollectionUtils; import org.junit.Test; import org.junit.runner.RunWith; @@ -43,10 +47,7 @@ import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; /** @@ -75,6 +76,9 @@ public class PaymentTestController { @Autowired private OrderBasicInfoService orderBasicInfoService; + @Autowired + private PileMerchantInfoService pileMerchantInfoService; + public List getPaymentIdList() { List resultList = Lists.newArrayList(); // List paymentIdList1 = getPaymentIdList1(); @@ -578,4 +582,227 @@ public class PaymentTestController { QueryPaymentConfirmDetailResponse response = adapayService.queryPaymentConfirmList(dto); System.out.println(JSON.toJSONString(response)); } + + private List getAdapayMemberIds() { + List list = new ArrayList<>(); + list.add("ACM25743626"); + list.add("ACM69424215"); + list.add("ACM27437238"); + list.add("ACM84442005"); + list.add("ACM42810916"); + list.add("ACM48576257"); + list.add("ACM67580043"); + list.add("ACM67738893"); + list.add("ACM23489463"); + list.add("ACM40991242"); + list.add("ACM48720983"); + list.add("ACM82507085"); + list.add("ACM25511707"); + list.add("ACM48152528"); + list.add("ACM84659934"); + list.add("ACM29123898"); + list.add("ACM82792551"); + list.add("ACM44398405"); + list.add("ACM61290230"); + list.add("ACM84693947"); + list.add("ACM25158725"); + list.add("ACM25530730"); + list.add("ACM48910890"); + list.add("ACM88601280"); + list.add("ACM61634334"); + list.add("ACM80895702"); + list.add("ACM63995722"); + list.add("ACM46639968"); + list.add("ACM61026608"); + list.add("ACM21943395"); + list.add("ACM42242307"); + list.add("ACM46086128"); + list.add("ACM46272826"); + list.add("ACM82731955"); + list.add("ACM69232006"); + list.add("ACM65039881"); + list.add("ACM84868091"); + list.add("ACM65835351"); + list.add("ACM84007115"); + list.add("ACM82710624"); + list.add("ACM63191244"); + list.add("ACM80665158"); + list.add("ACM88877099"); + list.add("ACM23253592"); + list.add("ACM40744704"); + list.add("ACM82585598"); + list.add("ACM61630517"); + list.add("ACM86381094"); + list.add("ACM44105905"); + list.add("ACM42282057"); + list.add("ACM46270169"); + list.add("ACM48532303"); + list.add("ACM69867986"); + list.add("ACM25576542"); + list.add("ACM80057656"); + list.add("ACM42459200"); + list.add("ACM46046815"); + list.add("ACM61085115"); + list.add("ACM25916276"); + list.add("ACM69804831"); + list.add("ACM65455320"); + list.add("ACM84845205"); + list.add("ACM48173891"); + list.add("ACM84486182"); + list.add("ACM46460819"); + list.add("ACM63740826"); + list.add("ACM63191096"); + list.add("ACM48918479"); + list.add("ACM29522241"); + list.add("ACM27416361"); + list.add("ACM48956267"); + list.add("ACM65098379"); + list.add("ACM46251134"); + list.add("ACM42491105"); + list.add("ACM27859047"); + list.add("ACM86788208"); + list.add("ACM46692941"); + list.add("ACM80410442"); + list.add("ACM27036391"); + list.add("ACM27853386"); + list.add("ACM23654452"); + list.add("ACM40782726"); + list.add("ACM63571448"); + list.add("ACM29566538"); + list.add("ACM46846737"); + list.add("ACM29393542"); + list.add("ACM23485623"); + list.add("ACM82526496"); + list.add("ACM27663450"); + list.add("ACM88831475"); + list.add("ACM69048024"); + list.add("ACM65854572"); + list.add("ACM88261510"); + list.add("ACM42497015"); + list.add("ACM61807959"); + list.add("ACM25116331"); + list.add("ACM82399802"); + list.add("ACM88079897"); + list.add("ACM61630935"); + list.add("ACM44799744"); + list.add("ACM40326146"); + list.add("ACM88603166"); + list.add("ACM86592402"); + list.add("ACM61254324"); + list.add("ACM69882166"); + list.add("ACM44124955"); + list.add("ACM65683299"); + list.add("ACM82102763"); + list.add("ACM48321794"); + list.add("ACM69673479"); + list.add("ACM67500600"); + list.add("ACM86172597"); + list.add("ACM42491905"); + list.add("ACM67751788"); + list.add("ACM42854259"); + list.add("ACM61465505"); + list.add("ACM25576943"); + list.add("ACM65622214"); + list.add("ACM88035944"); + list.add("ACM44582427"); + list.add("ACM61296391"); + list.add("ACM69426703"); + list.add("ACM44924010"); + list.add("ACM23673274"); + list.add("ACM84045137"); + list.add("ACM80205384"); + list.add("ACM44398211"); + list.add("ACM29908562"); + list.add("ACM23698348"); + list.add("ACM80266727"); + list.add("ACM63556694"); + list.add("ACM29560453"); + list.add("ACM63706872"); + list.add("ACM46652838"); + list.add("ACM65854139"); + list.add("ACM67183933"); + list.add("ACM69802535"); + list.add("ACM46829664"); + list.add("ACM21143070"); + list.add("ACM61425221"); + list.add("ACM42244668"); + list.add("ACM40326509"); + list.add("ACM21949831"); + list.add("ACM63383478"); + list.add("ACM40700033"); + list.add("ACM42056483"); + list.add("ACM61862227"); + list.add("ACM65681312"); + list.add("ACM21181689"); + list.add("ACM80876335"); + list.add("ACM63552660"); + list.add("ACM69082641"); + list.add("ACM23447202"); + list.add("ACM61024342"); + list.add("ACM46046494"); + list.add("ACM88607781"); + list.add("ACM86579709"); + list.add("ACM65858167"); + list.add("ACM42261148"); + list.add("ACM42077124"); + list.add("ACM48994471"); + list.add("ACM88873084"); + list.add("ACM84250847"); + list.add("ACM88073310"); + list.add("ACM65818689"); + list.add("ACM42810992"); + list.add("ACM21968250"); + list.add("ACM42666015"); + list.add("ACM42227741"); + list.add("ACM44968744"); + list.add("ACM65202789"); + list.add("ACM25576713"); + list.add("ACM25196954"); + list.add("ACM48530698"); + list.add("ACM25380172"); + list.add("ACM23468349"); + list.add("ACM65227435"); + list.add("ACM23270264"); + list.add("ACM82944298"); + list.add("ACM65056931"); + list.add("ACM25720476"); + list.add("ACM82948898"); + list.add("ACM46449658"); + list.add("ACM69844311"); + list.add("ACM82794891"); + list.add("ACM42474874"); + list.add("ACM48557876"); + list.add("ACM69061582"); + list.add("ACM84062890"); + list.add("ACM21981947"); + list.add("ACM23447601"); + list.add("ACM46000454"); + list.add("ACM80897090"); + list.add("ACM40594777"); + return list; + } + + @Test + public void queryAdapayMember() throws BaseAdaPayException { + // 查询所有运营商的adapayMemberId + // String adapayMemberId = "ACM25158725"; + List nullInfoList = new ArrayList<>(); + List targetList = new ArrayList<>(); + + List adapayMemberIds = getAdapayMemberIds(); + for (String memberId : adapayMemberIds) { + AdapayCorpMemberVO adapayCorpMemberVO = adapayService.queryCorpAdapayMemberInfo(memberId, wechatAppId1); + if (adapayCorpMemberVO == null) { + nullInfoList.add(memberId); + continue; + } + String settleAccountId = adapayCorpMemberVO.getSettleAccountId(); + if (settleAccountId == null) { + targetList.add(memberId); + } + } + logger.info("无信息运营商:{}", nullInfoList); + logger.info("无结算账户运营商:{}", targetList); + + } } diff --git a/jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java b/jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java index 8e81d95ef..d51604c02 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java +++ b/jsowell-common/src/main/java/com/jsowell/common/enums/ykc/PileChannelEntity.java @@ -1,11 +1,17 @@ package com.jsowell.common.enums.ykc; +import com.google.common.collect.Lists; import com.jsowell.common.util.StringUtils; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** @@ -18,6 +24,9 @@ public class PileChannelEntity { * 管理一个全局map,保存连接进服务端的通道数量 */ private static final ConcurrentHashMap manager = new ConcurrentHashMap<>(); + + // 桩号--channelId 一对多 + public static final ConcurrentHashMap> pileMap = new ConcurrentHashMap<>(); /** * 校验channel是否保存 @@ -49,6 +58,38 @@ public class PileChannelEntity { } } + public static void checkChannelV2(String pileSn, ChannelHandlerContext ctx) { + List list = pileMap.get(pileSn); + // 如果该桩当前没有保存过该channel,则进行保存 + if (!list.contains(ctx)) { + list.add(ctx); + pileMap.put(pileSn, list); + } + } + + /** + * 删除某台桩的某个channel连接 + * @param pileSn + * @param ctx + */ + public static void deleteChannel(String pileSn, ChannelHandlerContext ctx) { + String channelId = ctx.channel().id().asLongText(); + // 从map中删除该ctx + List list = pileMap.get(pileSn); + if (CollectionUtils.isEmpty(list)) { + return; + } + for (ChannelHandlerContext channelHandlerContext : list) { + String id = channelHandlerContext.channel().id().asLongText(); + if (StringUtils.equals(id, channelId)) { + // 传来的channelId与已保存的channel中有一致的,进行删除 + list.remove(channelHandlerContext); + // 同时关闭连接 + channelHandlerContext.close(); + } + } + } + /** * 通过桩编号获取channel链接信息 * @param pileSn diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java index a049504ee..ce1bd6964 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java @@ -149,6 +149,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { // 响应客户端 ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); this.channelWrite(channel.id(), buffer); + // 批量响应客户端 + // this.channelWriteBatch(ctx, buffer); if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { // 应答帧类型 byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes); @@ -164,6 +166,29 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } } + /** + * 批量发送信息 + * @param pileSn + * @param ctx + * @param msg + */ + private void channelWriteBatch(String pileSn, ChannelHandlerContext ctx, Object msg) { + // 获取该桩下的所有channel + List list = PileChannelEntity.pileMap.get(pileSn); + if(CollectionUtils.isEmpty(list)) { + return; + } + // 批量写入 + for (ChannelHandlerContext context : list) { + context.write(msg); + //刷新缓存区 + context.flush(); + } + + // 如果通道不存在,则将连接删除 + + } + // @Override // protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // log.info("channelRead0=== channelId:" + ctx.channel().id() + ", msg:" + msg); @@ -179,7 +204,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ChannelId channelId = ctx.channel().id(); //包含此客户端才去删除 if (CHANNEL_MAP.containsKey(channelId)) { - ykcService.exit(channelId); + ykcService.exit(ctx); //删除连接 CHANNEL_MAP.remove(channelId); log.info("客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size()); @@ -278,6 +303,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress()); } log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size()); + // 删除连接 + // PileChannelEntity.deleteChannel(pileSn, ctx); ctx.channel().close(); } } finally { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/YKCBusinessService.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/YKCBusinessService.java index 1bf85a02b..573c182ff 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/YKCBusinessService.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/YKCBusinessService.java @@ -22,7 +22,7 @@ public interface YKCBusinessService { /** * 桩退出 - * @param channelId channelId + * @param ctx channelId */ - void exit(ChannelId channelId); + void exit(ChannelHandlerContext ctx); } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java index d4149fa8f..c1abd438b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/yunkuaichong/impl/YKCBusinessServiceImpl.java @@ -66,9 +66,9 @@ public class YKCBusinessServiceImpl implements YKCBusinessService { } @Override - public void exit(ChannelId channelId) { + public void exit(ChannelHandlerContext ctx) { // 获取桩编号 - String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText()); + String pileSn = PileChannelEntity.getPileSnByChannelId(ctx.channel().id().asLongText()); if (StringUtils.isBlank(pileSn)) { return; } @@ -86,6 +86,9 @@ public class YKCBusinessServiceImpl implements YKCBusinessService { String jsonMsg = YKCFrameTypeCode.PILE_LOG_OUT.getValue() + ": 充电桩主动断开链接"; pileMsgRecordService.save(pileSn, pileSn, type, jsonMsg, ""); + // 删除连接 + // PileChannelEntity.deleteChannel(pileSn, ctx); + // 删除桩编号和channel的关系 // PileChannelEntity.removeByPileSn(pileSn); } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java index b3852c339..24aa11377 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java @@ -10,6 +10,7 @@ import com.jsowell.common.exception.BusinessException; import com.jsowell.common.protocol.SyncPromise; import com.jsowell.common.util.*; import com.jsowell.common.util.Cp56Time2a.Cp56Time2aUtil; +import com.jsowell.common.util.spring.SpringUtils; import com.jsowell.pile.domain.ykcCommond.*; import com.jsowell.pile.service.*; import com.jsowell.pile.vo.web.BillingTemplateVO; @@ -22,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.math.BigDecimal; @@ -29,6 +31,7 @@ import java.time.LocalTime; import java.util.Date; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -53,6 +56,9 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { @Autowired private PileConnectorInfoService pileConnectorInfoService; + // 引入线程池 + private ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor"); + // 需要记录报文的数据帧类型 private final List frameTypeList = Lists.newArrayList( YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_RESTART_CODE.getBytes()), @@ -118,6 +124,16 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg); log.error("push发送命令失败, pileSn:{}", pileSn, cause); + + // 异步删除Map中的保存关系(带关闭连接) + // CompletableFuture.runAsync(() -> { + // try { + // // 删除Map中的保存关系 + // PileChannelEntity.deleteChannel(pileSn, ctx); + // } catch (Exception e) { + // log.error("异步删除、关闭连接 error", e); + // } + // }, executor); } }); @@ -213,6 +229,16 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}", pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), frameTypeName, wholeMsg); log.error("push发送命令失败, pileSn:{}", pileSn, cause); + + // 异步删除Map中的保存关系(带关闭连接) + // CompletableFuture.runAsync(() -> { + // try { + // // 删除Map中的保存关系 + // PileChannelEntity.deleteChannel(pileSn, ctx); + // } catch (Exception e) { + // log.error("异步删除、关闭连接 error", e); + // } + // }, executor); } });