diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java index 3dc6c78..15b97a9 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java @@ -4,12 +4,12 @@ */ package sanbing.jcpp.app.service; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; /** * @author baigod */ public interface DownlinkCallService { - void sendDownlinkMessage(DownlinkRestMessage.Builder downlinkMessageBuilder, String pileCode); + void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode); } \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java index 62a6b87..a29221f 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java @@ -21,7 +21,7 @@ import sanbing.jcpp.infrastructure.cache.CacheValueWrapper; import sanbing.jcpp.infrastructure.cache.TransactionalCache; import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider; import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; import sanbing.jcpp.protocol.adapter.DownlinkController; import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*; @@ -49,7 +49,7 @@ public class DefaultDownlinkCallService implements DownlinkCallService { private String cacheType; @Override - public void sendDownlinkMessage(DownlinkRestMessage.Builder downlinkMessageBuilder, String pileCode) { + public void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode) { if (serviceInfoProvider.isMonolith() && "caffeine".equalsIgnoreCase(cacheType)) { downlinkController.onDownlink(downlinkMessageBuilder.build()) @@ -75,21 +75,21 @@ public class DefaultDownlinkCallService implements DownlinkCallService { } } - private void invokeDownlinkRestApi(DownlinkRestMessage downlinkRestMessage, String nodeWebapiIpPort) { + private void invokeDownlinkRestApi(DownlinkRequestMessage DownlinkRequestMessage, String nodeWebapiIpPort) { HttpHeaders headers = new HttpHeaders(); headers.add(JCPP_TRACER_ID, TracerContextUtil.getCurrentTracer().getTraceId()); headers.add(JCPP_TRACER_ORIGIN, TracerContextUtil.getCurrentTracer().getOrigin()); headers.add(JCPP_TRACER_TS, String.valueOf(TracerContextUtil.getCurrentTracer().getTracerTs())); headers.setContentType(MediaType.parseMediaType("application/x-protobuf")); - HttpEntity entity = new HttpEntity<>(downlinkRestMessage, headers); + HttpEntity entity = new HttpEntity<>(DownlinkRequestMessage, headers); try { ResponseEntity response = downlinkRestTemplate.postForEntity("http://" + nodeWebapiIpPort + "/api/onDownlink", entity, ResponseEntity.class); log.debug("下行消息发送成功 {}", response); } catch (RestClientException e) { - log.error("下行消息发送失败 {}", downlinkRestMessage, e); + log.error("下行消息发送失败 {}", DownlinkRequestMessage, e); throw new RuntimeException(e); } diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java index 10fc0dd..f7ccab0 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java @@ -59,7 +59,7 @@ public class DefaultPileProtocolService implements PileProtocolService { log.debug("查询到充电桩信息 {}", pile); // 构造下行回复 - DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, loginRequest.getPileCode()); + DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, loginRequest.getPileCode()); downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.LOGIN_ACK.name()); if (pile != null) { @@ -122,7 +122,7 @@ public class DefaultPileProtocolService implements PileProtocolService { // todo 默认校验成功,后续查库校验 assert pricingId > 0; - DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode); + DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode); downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.VERIFY_PRICING_ACK.name()); downlinkMessageBuilder.setVerifyPricingResponse(VerifyPricingResponse.newBuilder() .setSuccess(true) @@ -167,7 +167,7 @@ public class DefaultPileProtocolService implements PileProtocolService { model.setPeriodsList(periods); // 构造下行计费 - DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode); + DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode); downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.QUERY_PRICING_ACK.name()); downlinkMessageBuilder.setQueryPricingResponse(QueryPricingResponse.newBuilder() .setPileCode(pileCode) @@ -236,7 +236,7 @@ public class DefaultPileProtocolService implements PileProtocolService { String pileCode = transactionRecord.getPileCode(); // 构造下行计费 - DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode); + DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode); downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.TRANSACTION_RECORD.name()); downlinkMessageBuilder.setTransactionRecordAck(TransactionRecordAck.newBuilder() .setTradeNo(tradeNo) @@ -257,9 +257,9 @@ public class DefaultPileProtocolService implements PileProtocolService { return period; } - private DownlinkRestMessage.Builder createDownlinkMessageBuilder(UplinkQueueMessage uplinkQueueMessage, String pileCode) { + private DownlinkRequestMessage.Builder createDownlinkMessageBuilder(UplinkQueueMessage uplinkQueueMessage, String pileCode) { UUID messageId = UUID.randomUUID(); - DownlinkRestMessage.Builder builder = DownlinkRestMessage.newBuilder(); + DownlinkRequestMessage.Builder builder = DownlinkRequestMessage.newBuilder(); builder.setMessageIdMSB(messageId.getLeastSignificantBits()); builder.setMessageIdLSB(messageId.getLeastSignificantBits()); builder.setPileCode(pileCode); diff --git a/jcpp-infrastructure-proto/pom.xml b/jcpp-infrastructure-proto/pom.xml index 2583d91..82e15a5 100644 --- a/jcpp-infrastructure-proto/pom.xml +++ b/jcpp-infrastructure-proto/pom.xml @@ -26,6 +26,10 @@ + + javax.annotation + javax.annotation-api + com.google.protobuf protobuf-java @@ -34,6 +38,21 @@ com.google.protobuf protobuf-java-util + + io.grpc + grpc-netty-shaded + provided + + + io.grpc + grpc-protobuf + provided + + + io.grpc + grpc-stub + provided + diff --git a/jcpp-infrastructure-proto/src/main/proto/protocol.proto b/jcpp-infrastructure-proto/src/main/proto/protocol.proto index 0a90306..2ad68f5 100644 --- a/jcpp-infrastructure-proto/src/main/proto/protocol.proto +++ b/jcpp-infrastructure-proto/src/main/proto/protocol.proto @@ -9,6 +9,10 @@ package infrastructureProto; option java_package = "sanbing.jcpp.proto.gen"; option java_outer_classname = "ProtocolProto"; +service ProtocolDownlinkInterface { + rpc onDownlink(stream DownlinkRequestMessage) returns (stream DownlinkResponseMessage) {} +} + message UplinkQueueMessage { int64 messageIdMSB = 1; int64 messageIdLSB = 2; @@ -29,7 +33,7 @@ message UplinkQueueMessage { TransactionRecord transactionRecord = 30; } -message DownlinkRestMessage { +message DownlinkRequestMessage { int64 messageIdMSB = 1; int64 messageIdLSB = 2; int64 sessionIdMSB = 3; @@ -49,6 +53,11 @@ message DownlinkRestMessage { TransactionRecordAck transactionRecordAck = 26; } +message DownlinkResponseMessage { + bool success = 1; + optional string error = 2; +} + message LoginRequest { string pileCode = 2; string credential = 3; diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java index ce54453..ffa7948 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java @@ -15,7 +15,7 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; import sanbing.jcpp.protocol.domain.ProtocolSession; import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider; @@ -37,7 +37,7 @@ public class DownlinkController { ProtocolSessionRegistryProvider protocolSessionRegistryProvider; @PostMapping(value = "/onDownlink", consumes = "application/x-protobuf", produces = "application/x-protobuf") - public DeferredResult> onDownlink(@RequestBody DownlinkRestMessage downlinkMsg) { + public DeferredResult> onDownlink(@RequestBody DownlinkRequestMessage downlinkMsg) { log.debug("收到REST下行请求 {}", downlinkMsg); final DeferredResult> response = new DeferredResult<>(onDownlinkTimeout, diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolSession.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolSession.java index b8ead56..7b31620 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolSession.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolSession.java @@ -9,7 +9,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; import sanbing.jcpp.protocol.forwarder.Forwarder; import java.io.Closeable; @@ -52,14 +52,14 @@ public abstract class ProtocolSession implements Closeable { @Setter private Forwarder forwarder; - public ProtocolSession(String protocolName) { + protected ProtocolSession(String protocolName) { this.protocolName = protocolName; this.pileCodeSet = new LinkedHashSet<>(); this.id = UUID.randomUUID(); this.lastActivityTime = LocalDateTime.now(); } - public abstract void onDownlink(DownlinkRestMessage downlinkMsg); + public abstract void onDownlink(DownlinkRequestMessage downlinkMsg); public void close() { close(SessionCloseReason.DESTRUCTION); diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/SessionToHandlerMsg.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/SessionToHandlerMsg.java index fdf5a59..291a257 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/SessionToHandlerMsg.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/SessionToHandlerMsg.java @@ -4,10 +4,10 @@ */ package sanbing.jcpp.protocol.domain; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; /** * @author baigod */ -public record SessionToHandlerMsg(DownlinkRestMessage downlinkMsg, ProtocolSession session) { +public record SessionToHandlerMsg(DownlinkRequestMessage downlinkMsg, ProtocolSession session) { } \ No newline at end of file diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpChannelHandler.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpChannelHandler.java index c8819e1..abeb467 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpChannelHandler.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpChannelHandler.java @@ -19,7 +19,7 @@ import sanbing.jcpp.infrastructure.stats.MessagesStats; import sanbing.jcpp.infrastructure.util.exception.DownlinkException; import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil; import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; import sanbing.jcpp.protocol.ProtocolMessageProcessor; import sanbing.jcpp.protocol.domain.ListenerToHandlerMsg; import sanbing.jcpp.protocol.domain.ProtocolUplinkMsg; @@ -121,7 +121,7 @@ public class TcpChannelHandler extends SimpleChannelInboundHandler sendDownlinkConsumer; + private final Consumer sendDownlinkConsumer; private final Consumer writeAndFlushConsumer; @@ -64,7 +64,7 @@ public class TcpSession extends ProtocolSession { } public TcpSession(String protocolName, - Consumer sendDownlinkConsumer, + Consumer sendDownlinkConsumer, Consumer writeAndFlushConsumer) { super(protocolName); this.sendDownlinkConsumer = sendDownlinkConsumer; @@ -72,7 +72,7 @@ public class TcpSession extends ProtocolSession { } @Override - public void onDownlink(DownlinkRestMessage downlinkMsg) { + public void onDownlink(DownlinkRequestMessage downlinkMsg) { sendDownlinkConsumer.accept(downlinkMsg); } diff --git a/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java b/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java index 8346abc..20ecd62 100644 --- a/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java +++ b/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java @@ -21,7 +21,7 @@ import org.springframework.http.HttpStatus; import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil; import sanbing.jcpp.infrastructure.util.property.PropertyUtils; import sanbing.jcpp.proto.gen.ProtocolProto; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; import sanbing.jcpp.protocol.AbstractProtocolTestBase; import sanbing.jcpp.protocol.domain.DownlinkCmdEnum; import sanbing.jcpp.protocol.domain.ProtocolSession; @@ -116,9 +116,9 @@ class DownlinkControllerIT extends AbstractProtocolTestBase { UUID messageId = UUID.randomUUID(); UUID requestId = UUID.randomUUID(); - // 创建 DownlinkRestMessage 实例 + // 创建 DownlinkRequestMessage 实例 String pileCode = "20231212000010"; - DownlinkRestMessage downlinkMsg = DownlinkRestMessage.newBuilder() + DownlinkRequestMessage downlinkMsg = DownlinkRequestMessage.newBuilder() .setMessageIdMSB(messageId.getMostSignificantBits()) .setMessageIdLSB(messageId.getLeastSignificantBits()) .setSessionIdMSB(sessionId.getMostSignificantBits()) diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongDwonlinkMessage.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongDwonlinkMessage.java index 83db1a7..0e7eb3c 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongDwonlinkMessage.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongDwonlinkMessage.java @@ -7,7 +7,7 @@ package sanbing.jcpp.protocol.yunkuaichong; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; import java.io.Serializable; import java.util.UUID; @@ -32,7 +32,7 @@ public class YunKuaiChongDwonlinkMessage implements Serializable { private int cmd; // 消息体 - private DownlinkRestMessage msg; + private DownlinkRequestMessage msg; // 上行消息 private YunKuaiChongUplinkMessage requestData; diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java index 665b7b2..7a7f424 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java @@ -10,7 +10,7 @@ import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import sanbing.jcpp.infrastructure.util.JCPPPair; import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; import sanbing.jcpp.protocol.ProtocolContext; import sanbing.jcpp.protocol.ProtocolMessageProcessor; import sanbing.jcpp.protocol.domain.ListenerToHandlerMsg; @@ -163,7 +163,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg) { TcpSession session = (TcpSession) sessionToHandlerMsg.session(); - DownlinkRestMessage protocolDownlinkMsg = sessionToHandlerMsg.downlinkMsg(); + DownlinkRequestMessage protocolDownlinkMsg = sessionToHandlerMsg.downlinkMsg(); int cmd = YunKuaiChongDownlinkCmdEnum.valueOf(protocolDownlinkMsg.getDownlinkCmd()).getCmd(); diff --git a/pom.xml b/pom.xml index a0bb362..3a608a7 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ 6.6.2 3.9.2 3.8.16.Final + 1.3.2 @@ -180,6 +181,11 @@ xnio-api ${xnio-api.version} + + javax.annotation + javax.annotation-api + ${javax.annotation-api.version} + com.google.protobuf protobuf-java @@ -190,6 +196,21 @@ protobuf-java-util ${protobuf.version} + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + org.glassfish jakarta.el