修改downlink protobuf

This commit is contained in:
三丙
2024-10-22 17:11:05 +08:00
parent 73de4b28b3
commit 7445d4e3f0
14 changed files with 83 additions and 34 deletions

View File

@@ -4,12 +4,12 @@
*/
package sanbing.jcpp.app.service;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
/**
* @author baigod
*/
public interface DownlinkCallService {
void sendDownlinkMessage(DownlinkRestMessage.Builder downlinkMessageBuilder, String pileCode);
void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode);
}

View File

@@ -21,7 +21,7 @@ import sanbing.jcpp.infrastructure.cache.CacheValueWrapper;
import sanbing.jcpp.infrastructure.cache.TransactionalCache;
import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.adapter.DownlinkController;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*;
@@ -49,7 +49,7 @@ public class DefaultDownlinkCallService implements DownlinkCallService {
private String cacheType;
@Override
public void sendDownlinkMessage(DownlinkRestMessage.Builder downlinkMessageBuilder, String pileCode) {
public void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode) {
if (serviceInfoProvider.isMonolith() && "caffeine".equalsIgnoreCase(cacheType)) {
downlinkController.onDownlink(downlinkMessageBuilder.build())
@@ -75,21 +75,21 @@ public class DefaultDownlinkCallService implements DownlinkCallService {
}
}
private void invokeDownlinkRestApi(DownlinkRestMessage downlinkRestMessage, String nodeWebapiIpPort) {
private void invokeDownlinkRestApi(DownlinkRequestMessage DownlinkRequestMessage, String nodeWebapiIpPort) {
HttpHeaders headers = new HttpHeaders();
headers.add(JCPP_TRACER_ID, TracerContextUtil.getCurrentTracer().getTraceId());
headers.add(JCPP_TRACER_ORIGIN, TracerContextUtil.getCurrentTracer().getOrigin());
headers.add(JCPP_TRACER_TS, String.valueOf(TracerContextUtil.getCurrentTracer().getTracerTs()));
headers.setContentType(MediaType.parseMediaType("application/x-protobuf"));
HttpEntity<DownlinkRestMessage> entity = new HttpEntity<>(downlinkRestMessage, headers);
HttpEntity<DownlinkRequestMessage> entity = new HttpEntity<>(DownlinkRequestMessage, headers);
try {
ResponseEntity<?> response = downlinkRestTemplate.postForEntity("http://" + nodeWebapiIpPort + "/api/onDownlink",
entity, ResponseEntity.class);
log.debug("下行消息发送成功 {}", response);
} catch (RestClientException e) {
log.error("下行消息发送失败 {}", downlinkRestMessage, e);
log.error("下行消息发送失败 {}", DownlinkRequestMessage, e);
throw new RuntimeException(e);
}

View File

@@ -59,7 +59,7 @@ public class DefaultPileProtocolService implements PileProtocolService {
log.debug("查询到充电桩信息 {}", pile);
// 构造下行回复
DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, loginRequest.getPileCode());
DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, loginRequest.getPileCode());
downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.LOGIN_ACK.name());
if (pile != null) {
@@ -122,7 +122,7 @@ public class DefaultPileProtocolService implements PileProtocolService {
// todo 默认校验成功,后续查库校验
assert pricingId > 0;
DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode);
DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode);
downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.VERIFY_PRICING_ACK.name());
downlinkMessageBuilder.setVerifyPricingResponse(VerifyPricingResponse.newBuilder()
.setSuccess(true)
@@ -167,7 +167,7 @@ public class DefaultPileProtocolService implements PileProtocolService {
model.setPeriodsList(periods);
// 构造下行计费
DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode);
DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode);
downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.QUERY_PRICING_ACK.name());
downlinkMessageBuilder.setQueryPricingResponse(QueryPricingResponse.newBuilder()
.setPileCode(pileCode)
@@ -236,7 +236,7 @@ public class DefaultPileProtocolService implements PileProtocolService {
String pileCode = transactionRecord.getPileCode();
// 构造下行计费
DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode);
DownlinkRequestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, pileCode);
downlinkMessageBuilder.setDownlinkCmd(DownlinkCmdEnum.TRANSACTION_RECORD.name());
downlinkMessageBuilder.setTransactionRecordAck(TransactionRecordAck.newBuilder()
.setTradeNo(tradeNo)
@@ -257,9 +257,9 @@ public class DefaultPileProtocolService implements PileProtocolService {
return period;
}
private DownlinkRestMessage.Builder createDownlinkMessageBuilder(UplinkQueueMessage uplinkQueueMessage, String pileCode) {
private DownlinkRequestMessage.Builder createDownlinkMessageBuilder(UplinkQueueMessage uplinkQueueMessage, String pileCode) {
UUID messageId = UUID.randomUUID();
DownlinkRestMessage.Builder builder = DownlinkRestMessage.newBuilder();
DownlinkRequestMessage.Builder builder = DownlinkRequestMessage.newBuilder();
builder.setMessageIdMSB(messageId.getLeastSignificantBits());
builder.setMessageIdLSB(messageId.getLeastSignificantBits());
builder.setPileCode(pileCode);

View File

@@ -26,6 +26,10 @@
</properties>
<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
@@ -34,6 +38,21 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@@ -9,6 +9,10 @@ package infrastructureProto;
option java_package = "sanbing.jcpp.proto.gen";
option java_outer_classname = "ProtocolProto";
service ProtocolDownlinkInterface {
rpc onDownlink(stream DownlinkRequestMessage) returns (stream DownlinkResponseMessage) {}
}
message UplinkQueueMessage {
int64 messageIdMSB = 1;
int64 messageIdLSB = 2;
@@ -29,7 +33,7 @@ message UplinkQueueMessage {
TransactionRecord transactionRecord = 30;
}
message DownlinkRestMessage {
message DownlinkRequestMessage {
int64 messageIdMSB = 1;
int64 messageIdLSB = 2;
int64 sessionIdMSB = 3;
@@ -49,6 +53,11 @@ message DownlinkRestMessage {
TransactionRecordAck transactionRecordAck = 26;
}
message DownlinkResponseMessage {
bool success = 1;
optional string error = 2;
}
message LoginRequest {
string pileCode = 2;
string credential = 3;

View File

@@ -15,7 +15,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.domain.ProtocolSession;
import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;
@@ -37,7 +37,7 @@ public class DownlinkController {
ProtocolSessionRegistryProvider protocolSessionRegistryProvider;
@PostMapping(value = "/onDownlink", consumes = "application/x-protobuf", produces = "application/x-protobuf")
public DeferredResult<ResponseEntity<String>> onDownlink(@RequestBody DownlinkRestMessage downlinkMsg) {
public DeferredResult<ResponseEntity<String>> onDownlink(@RequestBody DownlinkRequestMessage downlinkMsg) {
log.debug("收到REST下行请求 {}", downlinkMsg);
final DeferredResult<ResponseEntity<String>> response = new DeferredResult<>(onDownlinkTimeout,

View File

@@ -9,7 +9,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.forwarder.Forwarder;
import java.io.Closeable;
@@ -52,14 +52,14 @@ public abstract class ProtocolSession implements Closeable {
@Setter
private Forwarder forwarder;
public ProtocolSession(String protocolName) {
protected ProtocolSession(String protocolName) {
this.protocolName = protocolName;
this.pileCodeSet = new LinkedHashSet<>();
this.id = UUID.randomUUID();
this.lastActivityTime = LocalDateTime.now();
}
public abstract void onDownlink(DownlinkRestMessage downlinkMsg);
public abstract void onDownlink(DownlinkRequestMessage downlinkMsg);
public void close() {
close(SessionCloseReason.DESTRUCTION);

View File

@@ -4,10 +4,10 @@
*/
package sanbing.jcpp.protocol.domain;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
/**
* @author baigod
*/
public record SessionToHandlerMsg(DownlinkRestMessage downlinkMsg, ProtocolSession session) {
public record SessionToHandlerMsg(DownlinkRequestMessage downlinkMsg, ProtocolSession session) {
}

View File

@@ -19,7 +19,7 @@ import sanbing.jcpp.infrastructure.stats.MessagesStats;
import sanbing.jcpp.infrastructure.util.exception.DownlinkException;
import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.ProtocolMessageProcessor;
import sanbing.jcpp.protocol.domain.ListenerToHandlerMsg;
import sanbing.jcpp.protocol.domain.ProtocolUplinkMsg;
@@ -121,7 +121,7 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
}
}
protected void onDownlink(DownlinkRestMessage downlinkMsg) throws DownlinkException {
protected void onDownlink(DownlinkRequestMessage downlinkMsg) throws DownlinkException {
protocolMessageProcessor.downlinkHandle(new SessionToHandlerMsg(downlinkMsg, tcpSession), downlinkMsgStats);
}

View File

@@ -9,7 +9,7 @@ import io.netty.channel.ChannelHandlerContext;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.domain.ProtocolSession;
import sanbing.jcpp.protocol.domain.SessionCloseReason;
import sanbing.jcpp.protocol.listener.tcp.enums.SequenceNumberLength;
@@ -32,7 +32,7 @@ public class TcpSession extends ProtocolSession {
private ChannelHandlerContext ctx;
private final Consumer<DownlinkRestMessage> sendDownlinkConsumer;
private final Consumer<DownlinkRequestMessage> sendDownlinkConsumer;
private final Consumer<ByteBuf> writeAndFlushConsumer;
@@ -64,7 +64,7 @@ public class TcpSession extends ProtocolSession {
}
public TcpSession(String protocolName,
Consumer<DownlinkRestMessage> sendDownlinkConsumer,
Consumer<DownlinkRequestMessage> sendDownlinkConsumer,
Consumer<ByteBuf> writeAndFlushConsumer) {
super(protocolName);
this.sendDownlinkConsumer = sendDownlinkConsumer;
@@ -72,7 +72,7 @@ public class TcpSession extends ProtocolSession {
}
@Override
public void onDownlink(DownlinkRestMessage downlinkMsg) {
public void onDownlink(DownlinkRequestMessage downlinkMsg) {
sendDownlinkConsumer.accept(downlinkMsg);
}

View File

@@ -21,7 +21,7 @@ import org.springframework.http.HttpStatus;
import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
import sanbing.jcpp.infrastructure.util.property.PropertyUtils;
import sanbing.jcpp.proto.gen.ProtocolProto;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.AbstractProtocolTestBase;
import sanbing.jcpp.protocol.domain.DownlinkCmdEnum;
import sanbing.jcpp.protocol.domain.ProtocolSession;
@@ -116,9 +116,9 @@ class DownlinkControllerIT extends AbstractProtocolTestBase {
UUID messageId = UUID.randomUUID();
UUID requestId = UUID.randomUUID();
// 创建 DownlinkRestMessage 实例
// 创建 DownlinkRequestMessage 实例
String pileCode = "20231212000010";
DownlinkRestMessage downlinkMsg = DownlinkRestMessage.newBuilder()
DownlinkRequestMessage downlinkMsg = DownlinkRequestMessage.newBuilder()
.setMessageIdMSB(messageId.getMostSignificantBits())
.setMessageIdLSB(messageId.getLeastSignificantBits())
.setSessionIdMSB(sessionId.getMostSignificantBits())

View File

@@ -7,7 +7,7 @@ package sanbing.jcpp.protocol.yunkuaichong;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import java.io.Serializable;
import java.util.UUID;
@@ -32,7 +32,7 @@ public class YunKuaiChongDwonlinkMessage implements Serializable {
private int cmd;
// 消息体
private DownlinkRestMessage msg;
private DownlinkRequestMessage msg;
// 上行消息
private YunKuaiChongUplinkMessage requestData;

View File

@@ -10,7 +10,7 @@ import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.infrastructure.util.JCPPPair;
import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.ProtocolContext;
import sanbing.jcpp.protocol.ProtocolMessageProcessor;
import sanbing.jcpp.protocol.domain.ListenerToHandlerMsg;
@@ -163,7 +163,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess
public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg) {
TcpSession session = (TcpSession) sessionToHandlerMsg.session();
DownlinkRestMessage protocolDownlinkMsg = sessionToHandlerMsg.downlinkMsg();
DownlinkRequestMessage protocolDownlinkMsg = sessionToHandlerMsg.downlinkMsg();
int cmd = YunKuaiChongDownlinkCmdEnum.valueOf(protocolDownlinkMsg.getDownlinkCmd()).getCmd();

21
pom.xml
View File

@@ -51,6 +51,7 @@
<oshi-core.version>6.6.2</oshi-core.version>
<zookeeper.version>3.9.2</zookeeper.version>
<xnio-api.version>3.8.16.Final</xnio-api.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
</properties>
<profiles>
@@ -180,6 +181,11 @@
<artifactId>xnio-api</artifactId>
<version>${xnio-api.version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>${javax.annotation-api.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
@@ -190,6 +196,21 @@
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.el</artifactId>