拆分protobuf

This commit is contained in:
三丙
2025-09-12 14:40:18 +08:00
parent bc5411eb4b
commit 38548b5230
87 changed files with 558 additions and 467 deletions

View File

@@ -17,7 +17,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.DownlinkProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.domain.ProtocolSession;
import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;

View File

@@ -24,8 +24,9 @@ import org.springframework.stereotype.Service;
import sanbing.jcpp.infrastructure.util.mdc.MDCUtils;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
import sanbing.jcpp.proto.gen.DownlinkProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.GrpcProto.*;
import sanbing.jcpp.proto.gen.ProtocolInterfaceGrpc.ProtocolInterfaceImplBase;
import sanbing.jcpp.proto.gen.ProtocolProto.*;
import sanbing.jcpp.protocol.domain.ProtocolSession;
import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;

View File

@@ -11,8 +11,10 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.proto.gen.ProtocolProto;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.DownlinkProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.UplinkProto.SessionCloseEventProto;
import sanbing.jcpp.proto.gen.UplinkProto.SessionCloseReason;
import sanbing.jcpp.proto.gen.UplinkProto.UplinkQueueMessage;
import sanbing.jcpp.protocol.forwarder.Forwarder;
import java.io.Closeable;
@@ -66,10 +68,10 @@ public abstract class ProtocolSession implements Closeable {
@Override
public void close() {
close(ProtocolProto.SessionCloseReason.SESSION_CLOSE_DESTRUCTION);
close(SessionCloseReason.SESSION_CLOSE_DESTRUCTION);
}
public void close(ProtocolProto.SessionCloseReason reason) {
public void close(SessionCloseReason reason) {
log.info("[{}] Protocol会话关闭原因: {}", this, reason);
scheduledFutures.values().forEach(scheduledFuture -> scheduledFuture.cancel(true));
@@ -79,13 +81,13 @@ public abstract class ProtocolSession implements Closeable {
if (forwarder != null && !pileCodeSet.isEmpty()) {
for (String pileCode : pileCodeSet) {
ProtocolProto.SessionCloseEventProto sessionCloseEvent = ProtocolProto.SessionCloseEventProto.newBuilder()
SessionCloseEventProto sessionCloseEvent = SessionCloseEventProto.newBuilder()
.setPileCode(pileCode)
.setReason(reason)
.setAdditionalInfo("Session closed: " + reason)
.build();
ProtocolProto.UplinkQueueMessage uplinkQueueMessage = ProtocolProto.UplinkQueueMessage.newBuilder()
UplinkQueueMessage uplinkQueueMessage = UplinkQueueMessage.newBuilder()
.setMessageIdMSB(UUID.randomUUID().getMostSignificantBits())
.setMessageIdLSB(UUID.randomUUID().getLeastSignificantBits())
.setSessionIdMSB(id.getMostSignificantBits())

View File

@@ -6,7 +6,7 @@
*/
package sanbing.jcpp.protocol.domain;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.DownlinkProto.DownlinkRequestMessage;
/**
* @author 九筒

View File

@@ -22,7 +22,7 @@ import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
import sanbing.jcpp.infrastructure.util.mdc.MDCUtils;
import sanbing.jcpp.infrastructure.util.trace.Tracer;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import sanbing.jcpp.proto.gen.UplinkProto.UplinkQueueMessage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

View File

@@ -30,7 +30,7 @@ import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
import sanbing.jcpp.infrastructure.util.mdc.MDCUtils;
import sanbing.jcpp.infrastructure.util.trace.Tracer;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import sanbing.jcpp.proto.gen.UplinkProto.UplinkQueueMessage;
import sanbing.jcpp.protocol.cfg.ForwarderCfg;
import sanbing.jcpp.protocol.cfg.KafkaCfg;

View File

@@ -13,7 +13,7 @@ import sanbing.jcpp.infrastructure.queue.discovery.PartitionProvider;
import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider;
import sanbing.jcpp.infrastructure.queue.provider.AppQueueFactory;
import sanbing.jcpp.infrastructure.stats.StatsFactory;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import sanbing.jcpp.proto.gen.UplinkProto.UplinkQueueMessage;
import sanbing.jcpp.protocol.cfg.ForwarderCfg;
import sanbing.jcpp.protocol.cfg.MemoryCfg;

View File

@@ -20,8 +20,8 @@ import sanbing.jcpp.infrastructure.stats.MessagesStats;
import sanbing.jcpp.infrastructure.util.exception.DownlinkException;
import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.SessionCloseReason;
import sanbing.jcpp.proto.gen.DownlinkProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.UplinkProto.SessionCloseReason;
import sanbing.jcpp.protocol.ProtocolMessageProcessor;
import sanbing.jcpp.protocol.domain.ListenerToHandlerMsg;
import sanbing.jcpp.protocol.domain.ProtocolUplinkMsg;

View File

@@ -11,9 +11,8 @@ import io.netty.channel.ChannelHandlerContext;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import sanbing.jcpp.proto.gen.ProtocolProto;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.SessionCloseReason;
import sanbing.jcpp.proto.gen.DownlinkProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.UplinkProto.SessionCloseReason;
import sanbing.jcpp.protocol.domain.ProtocolSession;
import sanbing.jcpp.protocol.listener.tcp.enums.SequenceNumberLength;
@@ -83,7 +82,7 @@ public class TcpSession extends ProtocolSession {
}
@Override
public void close(ProtocolProto.SessionCloseReason reason) {
public void close(SessionCloseReason reason) {
super.close(reason);
ctx.flush();

View File

@@ -16,7 +16,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory;
import sanbing.jcpp.infrastructure.util.config.ThreadPoolConfiguration;
import sanbing.jcpp.proto.gen.ProtocolProto.SessionCloseReason;
import sanbing.jcpp.proto.gen.UplinkProto.SessionCloseReason;
import sanbing.jcpp.protocol.domain.ProtocolSession;
import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;