From 76f9d5d3dcaea45636151a2c9f4719aca4199c9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= <10604541+sanbing-os@user.noreply.gitee.com> Date: Thu, 24 Oct 2024 15:41:26 +0800 Subject: [PATCH] =?UTF-8?q?grpc=20=E5=A2=9E=E5=8A=A0=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E8=BF=87=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jcpp/app/adapter/TestController.java | 31 ++++ .../jcpp/app/service/DownlinkCallService.java | 25 +-- .../jcpp/app/service/PileProtocolService.java | 7 + .../app/service/grpc/DownlinkGrpcClient.java | 157 ++++++++++-------- .../impl/DefaultPileProtocolService.java | 25 +++ .../service/impl/GrpcDownlinkCallService.java | 11 +- jcpp-infrastructure-proto/pom.xml | 4 + .../infrastructure/proto/ProtoConverter.java | 11 ++ .../src/main/proto/protocol.proto | 34 +++- .../util/trace/TracerContextUtil.java | 8 + .../protocol/adapter/DownlinkGrpcService.java | 67 +++++--- .../adapter/DownlinkControllerIT.java | 2 +- .../YunKuaiChongV150RemoteStartDLCmd.java | 6 +- ...KuaiChongV160RemoteParallelStartDLCmd.java | 5 +- 14 files changed, 278 insertions(+), 115 deletions(-) create mode 100644 jcpp-app/src/main/java/sanbing/jcpp/app/adapter/TestController.java diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/adapter/TestController.java b/jcpp-app/src/main/java/sanbing/jcpp/app/adapter/TestController.java new file mode 100644 index 0000000..074fcdf --- /dev/null +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/adapter/TestController.java @@ -0,0 +1,31 @@ +/** + * 抖音关注:程序员三丙 + * 知识星球:https://t.zsxq.com/j9b21 + */ +package sanbing.jcpp.app.adapter; + +import jakarta.annotation.Resource; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import sanbing.jcpp.app.service.PileProtocolService; + +import java.math.BigDecimal; + +/** + * @author baigod + */ +@RestController +public class TestController { + + @Resource + private PileProtocolService pileProtocolService; + + @GetMapping("/api/startCharge") + public ResponseEntity startCharge() { + + pileProtocolService.startCharge("20231212000010", "01", new BigDecimal("50"), "12345678901234567890"); + + return ResponseEntity.ok("success"); + } +} \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java index 2162c56..61990d1 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java @@ -12,12 +12,11 @@ import sanbing.jcpp.app.service.cache.session.PileSessionCacheKey; import sanbing.jcpp.infrastructure.cache.CacheValueWrapper; import sanbing.jcpp.infrastructure.cache.TransactionalCache; import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider; -import sanbing.jcpp.infrastructure.util.trace.Tracer; -import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; -import sanbing.jcpp.proto.gen.ProtocolProto; import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; import sanbing.jcpp.protocol.adapter.DownlinkController; +import java.util.UUID; + /** * @author baigod */ @@ -46,6 +45,18 @@ public abstract class DownlinkCallService { PileSession pileSession = pileSessionCacheValueWrapper.get(); + UUID protocolSessionId = pileSession.getProtocolSessionId(); + + if (downlinkMessageBuilder.getSessionIdMSB() == 0) { + downlinkMessageBuilder.setSessionIdMSB(protocolSessionId.getMostSignificantBits()); + } + if (downlinkMessageBuilder.getSessionIdLSB() == 0) { + downlinkMessageBuilder.setSessionIdLSB(protocolSessionId.getLeastSignificantBits()); + } + if(downlinkMessageBuilder.getProtocolName() == null){ + downlinkMessageBuilder.setProtocolName(pileSession.getProtocolName()); + } + if (serviceInfoProvider.isMonolith() && ("caffeine".equalsIgnoreCase(cacheType)) || serviceInfoProvider.getServiceId().equalsIgnoreCase(pileSession.getNodeId())) { @@ -53,14 +64,6 @@ public abstract class DownlinkCallService { .setResultHandler(result -> log.debug("下行消息发送完成")); } else { - Tracer currentTracer = TracerContextUtil.getCurrentTracer(); - - downlinkMessageBuilder.setTracer(ProtocolProto.TracerProto.newBuilder() - .setId(currentTracer.getTraceId()) - .setOrigin(currentTracer.getOrigin()) - .setTs(currentTracer.getTracerTs()) - .build()); - _sendDownlinkMessage(downlinkMessageBuilder.build(), pileSession); } diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/PileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/PileProtocolService.java index 76c34ed..3290270 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/PileProtocolService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/PileProtocolService.java @@ -7,6 +7,8 @@ package sanbing.jcpp.app.service; import sanbing.jcpp.infrastructure.queue.Callback; import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage; +import java.math.BigDecimal; + /** * @author baigod */ @@ -63,4 +65,9 @@ public interface PileProtocolService { * 交易记录上报 */ void onTransactionRecord(UplinkQueueMessage uplinkQueueMessage, Callback callback); + + /** + * 启动充电 + */ + void startCharge(String pileCode, String gunCode, BigDecimal limitYuan, String orderNo); } \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/grpc/DownlinkGrpcClient.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/grpc/DownlinkGrpcClient.java index 647fd49..dee6bc3 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/grpc/DownlinkGrpcClient.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/grpc/DownlinkGrpcClient.java @@ -6,7 +6,6 @@ package sanbing.jcpp.app.service.grpc; import com.google.common.net.HostAndPort; import io.grpc.CompressorRegistry; -import io.grpc.ConnectivityState; import io.grpc.DecompressorRegistry; import io.grpc.ManagedChannel; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; @@ -18,21 +17,27 @@ import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider; import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory; +import sanbing.jcpp.infrastructure.util.mdc.MDCUtils; +import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; import sanbing.jcpp.infrastructure.util.trace.TracerRunnable; -import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc; -import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc.ProtocolDownlinkInterfaceStub; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkResponseMessage; +import sanbing.jcpp.proto.gen.ProtocolInterfaceGrpc; +import sanbing.jcpp.proto.gen.ProtocolInterfaceGrpc.ProtocolInterfaceStub; +import sanbing.jcpp.proto.gen.ProtocolProto.*; import javax.annotation.PreDestroy; +import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import static sanbing.jcpp.infrastructure.proto.ProtoConverter.toTracerProto; + /** * @author baigod */ @@ -70,42 +75,39 @@ public class DownlinkGrpcClient { @Value("${downlink.rpc.grpc.records_ttl:600000}") private long recordsTtl; - private volatile boolean initialized = true; + @Value("${downlink.rpc.grpc.max_reconnect_times:10}") + private int maxReconnectTimes; + + @Resource + ServiceInfoProvider serviceInfoProvider; private final Map channelMap = new ConcurrentHashMap<>(); - private final Map> inputStreamMap = new ConcurrentHashMap<>(); - private final Map> queueMap = new ConcurrentHashMap<>(); - private final Map msgHandleLocks = new ConcurrentHashMap<>(); - private final Map msgHandleExecutors = new ConcurrentHashMap<>(); - private ExecutorService grpcStarterExecutor; - private ScheduledExecutorService grpcStateCheckExecutor; + private final Map> inputStreamMap = new ConcurrentHashMap<>(); + private final Map> queueMap = new ConcurrentHashMap<>(); + private final Map msgHandleLocksMap = new ConcurrentHashMap<>(); + private final Map msgHandleExecutorMap = new ConcurrentHashMap<>(); + private final Map connectErrTimesMap = new ConcurrentHashMap<>(); + private final Map initializedMap = new ConcurrentHashMap<>(); private ScheduledExecutorService downlinkMsgsExecutor; @PostConstruct public void init() { - grpcStarterExecutor = Executors.newSingleThreadExecutor(JCPPThreadFactory.forName("grpc-starter-executor")); - grpcStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("grpc-check-executor")); downlinkMsgsExecutor = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("downlink-msgs-executor")); // 每秒进行一次连接检查与线程初始化 downlinkMsgsExecutor.scheduleWithFixedDelay(() -> { queueMap.forEach((key, queue) -> { - if (queue.isEmpty()) { - return; - } - ManagedChannel managedChannel = channelMap.get(key); - if (managedChannel == null || managedChannel.getState(false) != ConnectivityState.READY) { - grpcStarterExecutor.submit(new TracerRunnable(() -> connect(key))); - return; + if (managedChannel == null) { + connect(key); } - msgHandleExecutors.computeIfAbsent(key, hostAndPort -> + msgHandleExecutorMap.computeIfAbsent(key, hostAndPort -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("downlink-handle-threads-" + hostAndPort))) .execute(new TracerRunnable(() -> { - while (initialized) { + while (Boolean.TRUE.equals(initializedMap.computeIfAbsent(key, k -> Boolean.FALSE))) { try { handleMsgs(key, queue); } catch (Exception e) { @@ -116,32 +118,10 @@ public class DownlinkGrpcClient { }); }, 0, 1, TimeUnit.SECONDS); - - grpcStateCheckExecutor.scheduleWithFixedDelay(() -> { - channelMap.forEach((key, channel) -> { - ConnectivityState state = channel.getState(true); - - if (state == ConnectivityState.SHUTDOWN) { - log.info("Grpc 客户端SHUTDOWN {} {}", key, state); - - LinkedBlockingQueue queue = queueMap.get(key); - if (queue != null) { - queue.clear(); - queueMap.remove(key); - } - - ExecutorService executorService = msgHandleExecutors.get(key); - if (executorService != null) { - executorService.shutdownNow(); - msgHandleExecutors.remove(key); - } - } - }); - }, 0, 1, TimeUnit.SECONDS); } - private void handleMsgs(HostAndPort key, LinkedBlockingQueue queue) { - StreamObserver inputStream = inputStreamMap.get(key); + private void handleMsgs(HostAndPort key, LinkedBlockingQueue queue) { + StreamObserver inputStream = inputStreamMap.get(key); if (inputStream == null) { return; @@ -149,13 +129,13 @@ public class DownlinkGrpcClient { long acceptTs = System.currentTimeMillis() - recordsTtl; - List downlinkMsgs = new ArrayList<>(batchRecordsCount); + List downlinkMsgs = new ArrayList<>(batchRecordsCount); queue.drainTo(downlinkMsgs, batchRecordsCount); - for (DownlinkRequestMessage msg : downlinkMsgs) { + for (RequestMsg msg : downlinkMsgs) { - long ts = msg.getTracer().getTs(); + long ts = msg.getTs(); if (ts > 0 && ts < acceptTs) { @@ -165,7 +145,7 @@ public class DownlinkGrpcClient { } - ReentrantLock lock = msgHandleLocks.computeIfAbsent(key, hostAndPort -> new ReentrantLock()); + ReentrantLock lock = msgHandleLocksMap.computeIfAbsent(key, hostAndPort -> new ReentrantLock()); lock.lock(); try { @@ -175,7 +155,6 @@ public class DownlinkGrpcClient { } } - if (downlinkMsgs.isEmpty()) { try { Thread.sleep(noRecordsSleepInterval); @@ -191,17 +170,12 @@ public class DownlinkGrpcClient { public void destroy() { log.info("Starting Grpc destroying process"); - initialized = false; + initializedMap.replaceAll((hostAndPort, aBoolean) -> Boolean.FALSE); try { - - grpcStarterExecutor.shutdownNow(); - - grpcStateCheckExecutor.shutdownNow(); - downlinkMsgsExecutor.shutdownNow(); - msgHandleExecutors.values().forEach(ExecutorService::shutdownNow); + msgHandleExecutorMap.values().forEach(ExecutorService::shutdownNow); inputStreamMap.values().forEach(StreamObserver::onCompleted); @@ -214,10 +188,6 @@ public class DownlinkGrpcClient { public void connect(HostAndPort hostAndPort) { - if (channelMap.get(hostAndPort) != null && channelMap.get(hostAndPort).getState(true).ordinal() < 2) { - return; - } - log.info("[{}] Create new Grpc Client Channel!", hostAndPort); ManagedChannel managedChannel = NettyChannelBuilder.forAddress(hostAndPort.getHost(), hostAndPort.getPort()) @@ -238,8 +208,6 @@ public class DownlinkGrpcClient { .defaultLoadBalancingPolicy("round_robin") .build(); - log.info("Grpc 客户端READY {} {}", hostAndPort, managedChannel.getState(true)); - ManagedChannel remove = channelMap.remove(hostAndPort); if (remove != null) { @@ -248,17 +216,55 @@ public class DownlinkGrpcClient { channelMap.put(hostAndPort, managedChannel); - ProtocolDownlinkInterfaceStub stub = ProtocolDownlinkInterfaceGrpc.newStub(managedChannel); + ProtocolInterfaceStub stub = ProtocolInterfaceGrpc.newStub(managedChannel); - StreamObserver streamObserver = stub.onDownlink(new StreamObserver<>() { + StreamObserver streamObserver = stub.onDownlink(new StreamObserver<>() { @Override - public void onNext(DownlinkResponseMessage value) { - log.info("Grpc 接收到通信层反向回复 {}", value); + public void onNext(ResponseMsg responseMsg) { + TracerProto tracerProto = responseMsg.getTracer(); + TracerContextUtil.newTracer(tracerProto.getId(), tracerProto.getOrigin(), tracerProto.getTs()); + MDCUtils.recordTracer(); + + if (responseMsg.hasConnectResponseMsg()) { + log.info("[{}] Grpc 接收到通信层连接反馈 {}", hostAndPort, responseMsg.getConnectResponseMsg()); + if (ConnectResponseCode.ACCEPTED.equals(responseMsg.getConnectResponseMsg().getResponseCode())) { + + initializedMap.put(hostAndPort, Boolean.TRUE); + + } else { + onError(new RuntimeException(responseMsg.getConnectResponseMsg().getErrorMsg())); + } + } + + if (responseMsg.hasDownlinkResponseMsg()) { + DownlinkResponseMessage downlinkResponseMsg = responseMsg.getDownlinkResponseMsg(); + if (!downlinkResponseMsg.getSuccess()) { + log.info("[{}] Grpc 下行数据发生错误回复 {}", hostAndPort, downlinkResponseMsg); + } + } } @Override public void onError(Throwable t) { - log.warn("Grpc 客户端异常 {}", t.getMessage()); + log.warn("[{}] Grpc 客户端异常 {}", hostAndPort, t.getMessage()); + + ExecutorService executorService = msgHandleExecutorMap.get(hostAndPort); + if (executorService != null) { + executorService.shutdownNow(); + msgHandleExecutorMap.remove(hostAndPort); + } + + ManagedChannel remove = channelMap.remove(hostAndPort); + + if (remove != null) { + remove.shutdownNow(); + } + + if (connectErrTimesMap.computeIfAbsent(hostAndPort, k -> new AtomicInteger()).incrementAndGet() >= maxReconnectTimes) { + queueMap.remove(hostAndPort); + connectErrTimesMap.remove(hostAndPort); + log.info("[{}] Grpc 客户端重连异常超过{}次,不再重连", hostAndPort, maxReconnectTimes); + } } @Override @@ -267,6 +273,13 @@ public class DownlinkGrpcClient { } }); + streamObserver.onNext(RequestMsg.newBuilder() + .setTracer(toTracerProto()) + .setConnectRequestMsg(ConnectRequestMsg.newBuilder() + .setNodeId(serviceInfoProvider.getServiceId()) + .build()) + .build()); + inputStreamMap.put(hostAndPort, streamObserver); } @@ -274,7 +287,7 @@ public class DownlinkGrpcClient { /** * 发送下行请求 */ - public void sendDownlinkRequest(HostAndPort hostAndPort, DownlinkRequestMessage downlinkRequestMessage) { - queueMap.computeIfAbsent(hostAndPort, k -> new LinkedBlockingQueue<>(maxRecordsSize)).add(downlinkRequestMessage); + public void sendDownlinkRequest(HostAndPort hostAndPort, RequestMsg requestMsg) { + queueMap.computeIfAbsent(hostAndPort, k -> new LinkedBlockingQueue<>(maxRecordsSize)).add(requestMsg); } } \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java index d6207f3..760b373 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java @@ -260,6 +260,31 @@ public class DefaultPileProtocolService implements PileProtocolService { callback.onSuccess(); } + @Override + public void startCharge(String pileCode, String gunCode, BigDecimal limitYuan, String orderNo) { + + + UUID messageId = UUID.randomUUID(); + UUID requestId = UUID.randomUUID(); + + DownlinkRequestMessage.Builder downlinkRequestMessageBuilder = DownlinkRequestMessage.newBuilder() + .setMessageIdMSB(messageId.getMostSignificantBits()) + .setMessageIdLSB(messageId.getLeastSignificantBits()) + .setPileCode(pileCode) + .setRequestIdMSB(requestId.getMostSignificantBits()) + .setRequestIdLSB(requestId.getLeastSignificantBits()) + .setDownlinkCmd(DownlinkCmdEnum.REMOTE_START_CHARGING.name()) + .setRemoteStartChargingRequest(RemoteStartChargingRequest.newBuilder() + .setPileCode(pileCode) + .setGunCode(gunCode) + .setLimitYuan(limitYuan.toPlainString()) + .setTradeNo(orderNo) + .build()); + + + downlinkCallService.sendDownlinkMessage(downlinkRequestMessageBuilder, pileCode); + } + private static Period createPeriod(int sn, LocalTime beginTime, LocalTime endTime, PricingModelFlag flag) { Period period = new Period(); period.setSn(sn); diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/GrpcDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/GrpcDownlinkCallService.java index 59c6e90..2152b24 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/GrpcDownlinkCallService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/GrpcDownlinkCallService.java @@ -13,6 +13,9 @@ import sanbing.jcpp.app.data.PileSession; import sanbing.jcpp.app.service.DownlinkCallService; import sanbing.jcpp.app.service.grpc.DownlinkGrpcClient; import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.RequestMsg; + +import static sanbing.jcpp.infrastructure.proto.ProtoConverter.toTracerProto; /** * @author baigod @@ -29,8 +32,14 @@ public class GrpcDownlinkCallService extends DownlinkCallService { protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession) { try { + RequestMsg requestMsg = RequestMsg.newBuilder() + .setTs(System.currentTimeMillis()) + .setTracer(toTracerProto()) + .setDownlinkRequestMessage(downlinkMessage) + .build(); + downlinkGrpcClient.sendDownlinkRequest(HostAndPort.fromParts(pileSession.getNodeIp(), pileSession.getNodeGrpcPort()), - downlinkMessage); + requestMsg); } catch (Exception e) { log.error("下行消息发送异常", e); diff --git a/jcpp-infrastructure-proto/pom.xml b/jcpp-infrastructure-proto/pom.xml index b625d3a..e704e3e 100644 --- a/jcpp-infrastructure-proto/pom.xml +++ b/jcpp-infrastructure-proto/pom.xml @@ -26,6 +26,10 @@ + + sanbing + jcpp-infrastructure-util + javax.annotation javax.annotation-api diff --git a/jcpp-infrastructure-proto/src/main/java/sanbing/jcpp/infrastructure/proto/ProtoConverter.java b/jcpp-infrastructure-proto/src/main/java/sanbing/jcpp/infrastructure/proto/ProtoConverter.java index def0edb..f9bd59d 100644 --- a/jcpp-infrastructure-proto/src/main/java/sanbing/jcpp/infrastructure/proto/ProtoConverter.java +++ b/jcpp-infrastructure-proto/src/main/java/sanbing/jcpp/infrastructure/proto/ProtoConverter.java @@ -8,6 +8,8 @@ package sanbing.jcpp.infrastructure.proto; import sanbing.jcpp.infrastructure.proto.model.PricingModel; import sanbing.jcpp.infrastructure.proto.model.PricingModel.FlagPrice; import sanbing.jcpp.infrastructure.proto.model.PricingModel.Period; +import sanbing.jcpp.infrastructure.util.trace.Tracer; +import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; import sanbing.jcpp.proto.gen.ProtocolProto.*; import java.util.Map; @@ -17,6 +19,15 @@ import java.util.Map; */ public class ProtoConverter { + public static TracerProto toTracerProto() { + Tracer currentTracer = TracerContextUtil.getCurrentTracer(); + return TracerProto.newBuilder() + .setId(currentTracer.getTraceId()) + .setOrigin(currentTracer.getOrigin()) + .setTs(currentTracer.getTracerTs()) + .build(); + } + public static PricingModelProto toPricingModel(PricingModel pricingModel) { // 创建 PricingModelProto 实例 PricingModelProto.Builder builder = PricingModelProto.newBuilder(); diff --git a/jcpp-infrastructure-proto/src/main/proto/protocol.proto b/jcpp-infrastructure-proto/src/main/proto/protocol.proto index a34aa90..93195fc 100644 --- a/jcpp-infrastructure-proto/src/main/proto/protocol.proto +++ b/jcpp-infrastructure-proto/src/main/proto/protocol.proto @@ -9,8 +9,35 @@ package infrastructureProto; option java_package = "sanbing.jcpp.proto.gen"; option java_outer_classname = "ProtocolProto"; -service ProtocolDownlinkInterface { - rpc onDownlink(stream DownlinkRequestMessage) returns (stream DownlinkResponseMessage) {} +service ProtocolInterface { + rpc onDownlink(stream RequestMsg) returns (stream ResponseMsg) {} +} + +message RequestMsg { + int64 ts = 1; + TracerProto tracer = 2; + ConnectRequestMsg connectRequestMsg = 10; + DownlinkRequestMessage downlinkRequestMessage = 11; +} + +message ResponseMsg { + TracerProto tracer = 2; + ConnectResponseMsg connectResponseMsg = 12; + DownlinkResponseMessage downlinkResponseMsg = 13; +} + +message ConnectResponseMsg { + ConnectResponseCode responseCode = 1; + string errorMsg = 2; +} + +message ConnectRequestMsg { + string nodeId = 1; +} + +enum ConnectResponseCode { + ACCEPTED = 0; + REFUSE = 1; } message TracerProto { @@ -49,7 +76,6 @@ message DownlinkRequestMessage { optional int64 requestIdMSB = 8; optional int64 requestIdLSB = 9; optional bytes requestData = 10; - TracerProto tracer = 12; string downlinkCmd = 20; LoginResponse loginResponse = 21; VerifyPricingResponse verifyPricingResponse = 22; @@ -206,7 +232,7 @@ message RemoteStartChargingRequest { string pileCode = 4; string gunCode = 5; string tradeNo = 6; - int32 limitYuan = 7; + string limitYuan = 7; optional string additionalInfo = 20; } diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java index 589fc13..4b2597e 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java @@ -6,11 +6,15 @@ package sanbing.jcpp.infrastructure.util.trace; import org.apache.commons.lang3.StringUtils; +import java.util.Optional; + /** * Tracer上下文工具类 */ public class TracerContextUtil { + public static final String DEFAULT_ORIGIN = "jcpp"; + public static final String JCPP_TRACER_ID = "jcpp_tracer_id"; public static final String JCPP_TRACER_ORIGIN = "jcpp_tracer_origin"; public static final String JCPP_TRACER_TS = "jcpp_tracer_ts"; @@ -20,6 +24,8 @@ public class TracerContextUtil { public static Tracer newTracer(String traceId, String origin) { Tracer tracer; + origin = Optional.ofNullable(origin).orElse(DEFAULT_ORIGIN); + if (StringUtils.isEmpty(traceId)) { tracer = new Tracer(TraceIdGenerator.generate(), origin); } else { @@ -34,6 +40,8 @@ public class TracerContextUtil { public static Tracer newTracer(String traceId, String origin, long ts) { final Tracer tracer; + origin = Optional.ofNullable(origin).orElse(DEFAULT_ORIGIN); + if (StringUtils.isEmpty(traceId)) { tracer = new Tracer(TraceIdGenerator.generate(), origin, ts); } else { diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkGrpcService.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkGrpcService.java index 82107d5..e9a7d0e 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkGrpcService.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkGrpcService.java @@ -21,17 +21,17 @@ import org.springframework.stereotype.Service; import sanbing.jcpp.infrastructure.util.mdc.MDCUtils; import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; import sanbing.jcpp.infrastructure.util.trace.TracerRunnable; -import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc.ProtocolDownlinkInterfaceImplBase; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkResponseMessage; -import sanbing.jcpp.proto.gen.ProtocolProto.TracerProto; +import sanbing.jcpp.proto.gen.ProtocolInterfaceGrpc.ProtocolInterfaceImplBase; +import sanbing.jcpp.proto.gen.ProtocolProto.*; import sanbing.jcpp.protocol.domain.ProtocolSession; import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider; import javax.annotation.PreDestroy; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import static sanbing.jcpp.infrastructure.proto.ProtoConverter.toTracerProto; import static sanbing.jcpp.infrastructure.util.config.ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL; /** @@ -40,7 +40,7 @@ import static sanbing.jcpp.infrastructure.util.config.ThreadPoolConfiguration.JC @Service @Slf4j @ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='protocol'") -public class DownlinkGrpcService extends ProtocolDownlinkInterfaceImplBase { +public class DownlinkGrpcService extends ProtocolInterfaceImplBase { @Value("${service.protocol.rpc.port}") private int rpcPort; @Value("${service.protocol.rpc.boss}") @@ -64,6 +64,7 @@ public class DownlinkGrpcService extends ProtocolDownlinkInterfaceImplBase { ProtocolSessionRegistryProvider protocolSessionRegistryProvider; private Server server; + private static final ReentrantLock replyLock = new ReentrantLock(); @PostConstruct public void init() throws Exception { @@ -108,37 +109,59 @@ public class DownlinkGrpcService extends ProtocolDownlinkInterfaceImplBase { } @Override - public StreamObserver onDownlink(StreamObserver responseObserver) { + public StreamObserver onDownlink(StreamObserver responseObserver) { return new StreamObserver<>() { + @Override - public void onNext(DownlinkRequestMessage downlinkMsg) { - TracerProto tracerProto = downlinkMsg.getTracer(); + public void onNext(RequestMsg requestMsg) { + TracerProto tracerProto = requestMsg.getTracer(); TracerContextUtil.newTracer(tracerProto.getId(), tracerProto.getOrigin(), tracerProto.getTs()); MDCUtils.recordTracer(); - log.debug("收到Grpc下行请求 {}", downlinkMsg); - - JCPP_COMMON_THREAD_POOL.execute(new TracerRunnable(() -> { - UUID protocolSessionId = new UUID(downlinkMsg.getSessionIdMSB(), downlinkMsg.getSessionIdLSB()); - - ProtocolSession protocolSession = protocolSessionRegistryProvider.get(protocolSessionId); + log.debug("通信层收到Grpc下行请求 {}", requestMsg); + if (requestMsg.hasConnectRequestMsg()) { + replyLock.lock(); try { - if (protocolSession != null) { + responseObserver.onNext( + ResponseMsg.newBuilder() + .setTracer(toTracerProto()) + .setConnectResponseMsg(ConnectResponseMsg.newBuilder() + .setResponseCode(ConnectResponseCode.ACCEPTED) + .setErrorMsg("") + .build()) + .build()); + } finally { + replyLock.unlock(); + } + } - protocolSession.onDownlink(downlinkMsg); + if(requestMsg.hasDownlinkRequestMessage()){ + DownlinkRequestMessage downlinkMsg = requestMsg.getDownlinkRequestMessage(); + JCPP_COMMON_THREAD_POOL.execute(new TracerRunnable(() -> { + UUID protocolSessionId = new UUID(downlinkMsg.getSessionIdMSB(), downlinkMsg.getSessionIdLSB()); - } else { + ProtocolSession protocolSession = protocolSessionRegistryProvider.get(protocolSessionId); - log.info("下发报文时Session未找到 sessionId: {}", protocolSessionId); + try { + if (protocolSession != null) { + + protocolSession.onDownlink(downlinkMsg); + + } else { + + log.info("下发报文时Session未找到 sessionId: {}", protocolSessionId); + + } + } catch (Exception e) { + + log.warn("下发报文时处理失败 sessionId: {}", protocolSessionId, e); } - } catch (Exception e) { + })); + } - log.warn("下发报文时处理失败 sessionId: {}", protocolSessionId, e); - } - })); } @Override diff --git a/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java b/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java index 20ecd62..1e4bc8e 100644 --- a/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java +++ b/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java @@ -131,7 +131,7 @@ class DownlinkControllerIT extends AbstractProtocolTestBase { .setRemoteStartChargingRequest(ProtocolProto.RemoteStartChargingRequest.newBuilder() .setPileCode(pileCode) .setGunCode("01") - .setLimitYuan(100) + .setLimitYuan("100") .setTradeNo("12345678901234567890") .build()) .build(); diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150RemoteStartDLCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150RemoteStartDLCmd.java index 9088b49..d5cc7d4 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150RemoteStartDLCmd.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150RemoteStartDLCmd.java @@ -16,6 +16,8 @@ import sanbing.jcpp.protocol.yunkuaichong.YunKuaiChongDownlinkCmdExe; import sanbing.jcpp.protocol.yunkuaichong.YunKuaiChongDwonlinkMessage; import sanbing.jcpp.protocol.yunkuaichong.annotation.YunKuaiChongCmd; +import java.math.BigDecimal; + import static sanbing.jcpp.protocol.yunkuaichong.enums.YunKuaiChongDownlinkCmdEnum.REMOTE_START_CHARGING; /** @@ -39,7 +41,7 @@ public class YunKuaiChongV150RemoteStartDLCmd extends YunKuaiChongDownlinkCmdExe String pileCode = remoteStartChargingRequest.getPileCode(); String gunCode = remoteStartChargingRequest.getGunCode(); String tradeNo = remoteStartChargingRequest.getTradeNo(); - int limitYuan = remoteStartChargingRequest.getLimitYuan(); + String limitYuan = remoteStartChargingRequest.getLimitYuan(); byte[] cardNo = encodeCardNo(tradeNo); @@ -55,7 +57,7 @@ public class YunKuaiChongV150RemoteStartDLCmd extends YunKuaiChongDownlinkCmdExe // 物理卡号 msgBody.writeBytes(cardNo); // 账户余额 - msgBody.writeIntLE(limitYuan); + msgBody.writeIntLE(new BigDecimal(limitYuan).intValue()); encodeAndWriteFlush(REMOTE_START_CHARGING, msgBody, diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v160/cmd/YunKuaiChongV160RemoteParallelStartDLCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v160/cmd/YunKuaiChongV160RemoteParallelStartDLCmd.java index 4a2449a..cd7abca 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v160/cmd/YunKuaiChongV160RemoteParallelStartDLCmd.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v160/cmd/YunKuaiChongV160RemoteParallelStartDLCmd.java @@ -16,6 +16,7 @@ import sanbing.jcpp.protocol.yunkuaichong.YunKuaiChongDownlinkCmdExe; import sanbing.jcpp.protocol.yunkuaichong.YunKuaiChongDwonlinkMessage; import sanbing.jcpp.protocol.yunkuaichong.annotation.YunKuaiChongCmd; +import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @@ -44,7 +45,7 @@ public class YunKuaiChongV160RemoteParallelStartDLCmd extends YunKuaiChongDownli String pileCode = remoteStartChargingRequest.getPileCode(); String gunCode = remoteStartChargingRequest.getGunCode(); String tradeNo = remoteStartChargingRequest.getTradeNo(); - int limitYuan = remoteStartChargingRequest.getLimitYuan(); + String limitYuan = remoteStartChargingRequest.getLimitYuan(); byte[] cardNo = encodeCardNo(tradeNo); @@ -60,7 +61,7 @@ public class YunKuaiChongV160RemoteParallelStartDLCmd extends YunKuaiChongDownli // 物理卡号 msgBody.writeBytes(cardNo); // 账户余额 - msgBody.writeIntLE(limitYuan); + msgBody.writeIntLE(new BigDecimal(limitYuan).intValue()); // 并充序号 msgBody.writeBytes(BCDUtil.toBytes(LocalDateTime.now().format(dateTimeFormatter)));