diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 187cab9..67cfc18 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -164,10 +164,23 @@ service: type: "${SERVICE_TYPE:monolith}" # 可自定义的服务ID,如果不指定,则默认为HOSTNAME id: "${SERVICE_ID:}" - protocols: + protocol: sessions: default-inactivity-timeout-in-sec: "${PROTOCOLS_SESSIONS_DEFAULT_INACTIVITY_TIMEOUT_IN_SEC:600}" default-state-check-interval-in-sec: "${PROTOCOLS_SESSIONS_DEFAULT_STATE_CHECK_INTERVAL_IN_SEC:60}" + rpc: + enabled: "${SERVICE_PROTOCOL_RPC_ENABLED:true}" + port: "${SERVICE_PROTOCOL_RPC_PORT:9090}" + boss: "${SERVICE_PROTOCOL_RPC_BOSS:4}" + worker: "${SERVICE_PROTOCOL_RPC_WORKER:64}" + so-rcvbuf: "${SERVICE_PROTOCOL_RPC_SO_RCVBUF:65535}" + so-sndbuf: "${SERVICE_PROTOCOL_RPC_SO_SNDBUF:65535}" + no-delay: "${SERVICE_PROTOCOL_RPC_NO_DELAY:true}" + user-thread-pool-size: "${SERVICE_PROTOCOL_RPC_USER_THREAD_POOL_SIZE:1024}" + max-inbound-message-size: "${SERVICE_PROTOCOL_RPC_MAX_INBOUND_MESSAGE_SIZE:33554432}" + max-concurrent-calls-per-connection: "${SERVICE_PROTOCOL_MAX_CONCURRENT_CALLS_PER_CONNECTION:4}" + client-max-keep-alive-time-sec: "${SERVICE_PROTOCOL_RPC_CLIENT_MAX_KEEP_ALIVE_TIME_SEC:30}" + protocols: yunkuaichongV150: enabled: "${PROTOCOLS_YUNKUAICHONGV150_ENABLED:true}" listener: @@ -252,3 +265,7 @@ thread-pool: hash_function_name: "${THREAD_POOL_SHARDING_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:8}" stats-print-interval-ms: "${THREAD_POOL_SHARDING_STATS_PRINT_INTERVAL_MS:10000}" + +downlink: + rpc: + type: "${DOWNLINK_RPC_TYPE:grpc}" # rest or grpc \ No newline at end of file diff --git a/jcpp-app-bootstrap/src/test/resources/app-service-test.properties b/jcpp-app-bootstrap/src/test/resources/app-service-test.properties index d050d7f..ce335d7 100644 --- a/jcpp-app-bootstrap/src/test/resources/app-service-test.properties +++ b/jcpp-app-bootstrap/src/test/resources/app-service-test.properties @@ -1,3 +1,4 @@ spring.datasource.url=jdbc:postgresql://testenv:30135/jcpp service.protocols.yunkuaichongV150.listener.tcp.bind-port=0 -service.protocols.yunkuaichongV160.listener.tcp.bind-port=0 \ No newline at end of file +service.protocols.yunkuaichongV160.listener.tcp.bind-port=0 +service.protocol.rpc.port=0 \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/data/PileSession.java b/jcpp-app/src/main/java/sanbing/jcpp/app/data/PileSession.java index 1dbb5cc..0efd3d6 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/data/PileSession.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/data/PileSession.java @@ -29,7 +29,11 @@ public class PileSession implements Serializable { private String nodeId; - private String nodeWebapiIpPort; + private String nodeIp; + + private int nodeRestPort; + + private int nodeGrpcPort; public PileSession(UUID pileId, String pileCode, String protocolName) { this.pileId = pileId; @@ -45,13 +49,17 @@ public class PileSession implements Serializable { @JsonProperty("protocolSessionId") UUID protocolSessionId, @JsonProperty("remoteAddress") String remoteAddress, @JsonProperty("nodeId") String nodeId, - @JsonProperty("nodeWebapiIpPort") String nodeWebapiIpPort) { + @JsonProperty("nodeIp") String nodeIp, + @JsonProperty("nodeRestPort") int nodeRestPort, + @JsonProperty("nodeGrpcPort") int nodeGrpcPort) { this.pileId = pileId; this.pileCode = pileCode; this.protocolName = protocolName; this.protocolSessionId = protocolSessionId; this.remoteAddress = remoteAddress; this.nodeId = nodeId; - this.nodeWebapiIpPort = nodeWebapiIpPort; + this.nodeIp = nodeIp; + this.nodeRestPort = nodeRestPort; + this.nodeGrpcPort = nodeGrpcPort; } } \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java index 15b97a9..2162c56 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/DownlinkCallService.java @@ -4,12 +4,68 @@ */ package sanbing.jcpp.app.service; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import sanbing.jcpp.app.data.PileSession; +import sanbing.jcpp.app.service.cache.session.PileSessionCacheKey; +import sanbing.jcpp.infrastructure.cache.CacheValueWrapper; +import sanbing.jcpp.infrastructure.cache.TransactionalCache; +import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider; +import sanbing.jcpp.infrastructure.util.trace.Tracer; +import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; +import sanbing.jcpp.proto.gen.ProtocolProto; import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; +import sanbing.jcpp.protocol.adapter.DownlinkController; /** * @author baigod */ -public interface DownlinkCallService { +@Slf4j +public abstract class DownlinkCallService { - void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode); + @Resource + protected ServiceInfoProvider serviceInfoProvider; + + @Resource + protected DownlinkController downlinkController; + + @Resource + protected TransactionalCache pileSessionCache; + + @Value("${cache.type}") + protected String cacheType; + + public void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode) { + CacheValueWrapper pileSessionCacheValueWrapper = pileSessionCache.get(new PileSessionCacheKey(pileCode)); + + if (pileSessionCacheValueWrapper == null) { + log.warn("充电桩会话不存在 {}", pileCode); + return; + } + + PileSession pileSession = pileSessionCacheValueWrapper.get(); + + if (serviceInfoProvider.isMonolith() && + ("caffeine".equalsIgnoreCase(cacheType)) || serviceInfoProvider.getServiceId().equalsIgnoreCase(pileSession.getNodeId())) { + + downlinkController.onDownlink(downlinkMessageBuilder.build()) + .setResultHandler(result -> log.debug("下行消息发送完成")); + + } else { + Tracer currentTracer = TracerContextUtil.getCurrentTracer(); + + downlinkMessageBuilder.setTracer(ProtocolProto.TracerProto.newBuilder() + .setId(currentTracer.getTraceId()) + .setOrigin(currentTracer.getOrigin()) + .setTs(currentTracer.getTracerTs()) + .build()); + + + _sendDownlinkMessage(downlinkMessageBuilder.build(), pileSession); + } + } + + + protected abstract void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession); } \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/grpc/DownlinkGrpcClient.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/grpc/DownlinkGrpcClient.java new file mode 100644 index 0000000..647fd49 --- /dev/null +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/grpc/DownlinkGrpcClient.java @@ -0,0 +1,280 @@ +/** + * 抖音关注:程序员三丙 + * 知识星球:https://t.zsxq.com/j9b21 + */ +package sanbing.jcpp.app.service.grpc; + +import com.google.common.net.HostAndPort; +import io.grpc.CompressorRegistry; +import io.grpc.ConnectivityState; +import io.grpc.DecompressorRegistry; +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.channel.ChannelOption; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel; +import io.grpc.stub.StreamObserver; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory; +import sanbing.jcpp.infrastructure.util.trace.TracerRunnable; +import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc; +import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc.ProtocolDownlinkInterfaceStub; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkResponseMessage; + +import javax.annotation.PreDestroy; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author baigod + */ +@Component +@Slf4j +public class DownlinkGrpcClient { + + @Value("${downlink.rpc.grpc.netty.event_loop:}") + private Integer rpcNettyEventLoop; + + @Value("${downlink.rpc.grpc.netty.so_sndbuf:65535}") + private Integer rpcNettySoSndbuf; + + @Value("${downlink.rpc.grpc.netty.so_rcvbuf:65535}") + private Integer rpcNettySoRcvbuf; + + @Value("${downlink.rpc.grpc.netty.no_delay:true}") + private boolean rpcNoDelay; + + @Value("${downlink.rpc.grpc.netty.max_inbound_message_size:33554432}") + private Integer rpcMaxInboundMessageSize; + + @Value("${downlink.rpc.grpc.keep_alive_time_sec:300}") + private int keepAliveTimeSec; + + @Value("${downlink.rpc.grpc.max_records_size:102400}") + private int maxRecordsSize; + + @Value("${downlink.rpc.grpc.batch_records_count:1024}") + private int batchRecordsCount; + + @Value("${downlink.rpc.grpc.no_read_records_sleep:25}") + private long noRecordsSleepInterval; + + @Value("${downlink.rpc.grpc.records_ttl:600000}") + private long recordsTtl; + + private volatile boolean initialized = true; + + private final Map channelMap = new ConcurrentHashMap<>(); + private final Map> inputStreamMap = new ConcurrentHashMap<>(); + private final Map> queueMap = new ConcurrentHashMap<>(); + private final Map msgHandleLocks = new ConcurrentHashMap<>(); + private final Map msgHandleExecutors = new ConcurrentHashMap<>(); + private ExecutorService grpcStarterExecutor; + private ScheduledExecutorService grpcStateCheckExecutor; + private ScheduledExecutorService downlinkMsgsExecutor; + + @PostConstruct + public void init() { + grpcStarterExecutor = Executors.newSingleThreadExecutor(JCPPThreadFactory.forName("grpc-starter-executor")); + grpcStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("grpc-check-executor")); + downlinkMsgsExecutor = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("downlink-msgs-executor")); + + // 每秒进行一次连接检查与线程初始化 + downlinkMsgsExecutor.scheduleWithFixedDelay(() -> { + queueMap.forEach((key, queue) -> { + + if (queue.isEmpty()) { + return; + } + + ManagedChannel managedChannel = channelMap.get(key); + + if (managedChannel == null || managedChannel.getState(false) != ConnectivityState.READY) { + grpcStarterExecutor.submit(new TracerRunnable(() -> connect(key))); + return; + } + + msgHandleExecutors.computeIfAbsent(key, hostAndPort -> + Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("downlink-handle-threads-" + hostAndPort))) + .execute(new TracerRunnable(() -> { + while (initialized) { + try { + handleMsgs(key, queue); + } catch (Exception e) { + log.error("Failed to process messages handling!", e); + } + } + })); + }); + + }, 0, 1, TimeUnit.SECONDS); + + grpcStateCheckExecutor.scheduleWithFixedDelay(() -> { + channelMap.forEach((key, channel) -> { + ConnectivityState state = channel.getState(true); + + if (state == ConnectivityState.SHUTDOWN) { + log.info("Grpc 客户端SHUTDOWN {} {}", key, state); + + LinkedBlockingQueue queue = queueMap.get(key); + if (queue != null) { + queue.clear(); + queueMap.remove(key); + } + + ExecutorService executorService = msgHandleExecutors.get(key); + if (executorService != null) { + executorService.shutdownNow(); + msgHandleExecutors.remove(key); + } + } + }); + }, 0, 1, TimeUnit.SECONDS); + } + + private void handleMsgs(HostAndPort key, LinkedBlockingQueue queue) { + StreamObserver inputStream = inputStreamMap.get(key); + + if (inputStream == null) { + return; + } + + long acceptTs = System.currentTimeMillis() - recordsTtl; + + List downlinkMsgs = new ArrayList<>(batchRecordsCount); + + queue.drainTo(downlinkMsgs, batchRecordsCount); + + for (DownlinkRequestMessage msg : downlinkMsgs) { + + long ts = msg.getTracer().getTs(); + + if (ts > 0 && ts < acceptTs) { + + log.warn("[{}] 消息过期,直接丢弃 {}", key, ts); + + continue; + } + + + ReentrantLock lock = msgHandleLocks.computeIfAbsent(key, hostAndPort -> new ReentrantLock()); + + lock.lock(); + try { + inputStream.onNext(msg); + } finally { + lock.unlock(); + } + } + + + if (downlinkMsgs.isEmpty()) { + try { + Thread.sleep(noRecordsSleepInterval); + } catch (InterruptedException e) { + log.warn("Sleep interrupted!", e); + } + } else { + downlinkMsgs.clear(); + } + } + + @PreDestroy + public void destroy() { + log.info("Starting Grpc destroying process"); + + initialized = false; + + try { + + grpcStarterExecutor.shutdownNow(); + + grpcStateCheckExecutor.shutdownNow(); + + downlinkMsgsExecutor.shutdownNow(); + + msgHandleExecutors.values().forEach(ExecutorService::shutdownNow); + + inputStreamMap.values().forEach(StreamObserver::onCompleted); + + channelMap.values().forEach(ManagedChannel::shutdownNow); + + } catch (Exception e) { + log.error("Exception during disconnect", e); + } + } + + public void connect(HostAndPort hostAndPort) { + + if (channelMap.get(hostAndPort) != null && channelMap.get(hostAndPort).getState(true).ordinal() < 2) { + return; + } + + log.info("[{}] Create new Grpc Client Channel!", hostAndPort); + + ManagedChannel managedChannel = NettyChannelBuilder.forAddress(hostAndPort.getHost(), hostAndPort.getPort()) + .eventLoopGroup(new NioEventLoopGroup(Optional.ofNullable(rpcNettyEventLoop).orElse(Runtime.getRuntime().availableProcessors() * 2))) + .compressorRegistry(CompressorRegistry.getDefaultInstance()) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) + .withOption(ChannelOption.SO_SNDBUF, rpcNettySoSndbuf) + .withOption(ChannelOption.SO_RCVBUF, rpcNettySoRcvbuf) + .withOption(ChannelOption.TCP_NODELAY, rpcNoDelay) + .maxInboundMessageSize(rpcMaxInboundMessageSize) + .channelType(NioSocketChannel.class) + .directExecutor() + .keepAliveTime(keepAliveTimeSec, TimeUnit.SECONDS) + .usePlaintext() + .keepAliveTime(5, TimeUnit.MINUTES) // Change to a larger value, e.g. 5min. + .keepAliveTimeout(10, TimeUnit.SECONDS) // Change to a larger value, e.g. 10s. + .keepAliveWithoutCalls(true)// You should normally avoid enabling this. + .defaultLoadBalancingPolicy("round_robin") + .build(); + + log.info("Grpc 客户端READY {} {}", hostAndPort, managedChannel.getState(true)); + + ManagedChannel remove = channelMap.remove(hostAndPort); + + if (remove != null) { + channelMap.get(hostAndPort).shutdownNow(); + } + + channelMap.put(hostAndPort, managedChannel); + + ProtocolDownlinkInterfaceStub stub = ProtocolDownlinkInterfaceGrpc.newStub(managedChannel); + + StreamObserver streamObserver = stub.onDownlink(new StreamObserver<>() { + @Override + public void onNext(DownlinkResponseMessage value) { + log.info("Grpc 接收到通信层反向回复 {}", value); + } + + @Override + public void onError(Throwable t) { + log.warn("Grpc 客户端异常 {}", t.getMessage()); + } + + @Override + public void onCompleted() { + log.info("[{}] The Grpc connection was closed!", hostAndPort); + } + }); + + inputStreamMap.put(hostAndPort, streamObserver); + + } + + /** + * 发送下行请求 + */ + public void sendDownlinkRequest(HostAndPort hostAndPort, DownlinkRequestMessage downlinkRequestMessage) { + queueMap.computeIfAbsent(hostAndPort, k -> new LinkedBlockingQueue<>(maxRecordsSize)).add(downlinkRequestMessage); + } +} \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java deleted file mode 100644 index a29221f..0000000 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * 抖音关注:程序员三丙 - * 知识星球:https://t.zsxq.com/j9b21 - */ -package sanbing.jcpp.app.service.impl; - -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Service; -import org.springframework.web.client.RestClientException; -import org.springframework.web.client.RestTemplate; -import sanbing.jcpp.app.data.PileSession; -import sanbing.jcpp.app.service.DownlinkCallService; -import sanbing.jcpp.app.service.cache.session.PileSessionCacheKey; -import sanbing.jcpp.infrastructure.cache.CacheValueWrapper; -import sanbing.jcpp.infrastructure.cache.TransactionalCache; -import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider; -import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; -import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; -import sanbing.jcpp.protocol.adapter.DownlinkController; - -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*; - -/** - * @author baigod - */ -@Service -@Slf4j -public class DefaultDownlinkCallService implements DownlinkCallService { - - @Resource - RestTemplate downlinkRestTemplate; - - @Resource - ServiceInfoProvider serviceInfoProvider; - - @Resource - DownlinkController downlinkController; - - @Resource - TransactionalCache pileSessionCache; - - @Value("${cache.type}") - private String cacheType; - - @Override - public void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode) { - if (serviceInfoProvider.isMonolith() && "caffeine".equalsIgnoreCase(cacheType)) { - - downlinkController.onDownlink(downlinkMessageBuilder.build()) - .setResultHandler(result -> log.debug("下行消息发送完成")); - - } else { - try { - CacheValueWrapper pileSessionCacheValueWrapper = pileSessionCache.get(new PileSessionCacheKey(pileCode)); - - if (pileSessionCacheValueWrapper == null) { - log.warn("充电桩会话不存在 {}", pileCode); - return; - } - - PileSession pileSession = pileSessionCacheValueWrapper.get(); - - invokeDownlinkRestApi(downlinkMessageBuilder.build(), pileSession.getNodeWebapiIpPort()); - - - } catch (RestClientException e) { - log.error("下行消息发送异常", e); - } - } - } - - private void invokeDownlinkRestApi(DownlinkRequestMessage DownlinkRequestMessage, String nodeWebapiIpPort) { - HttpHeaders headers = new HttpHeaders(); - headers.add(JCPP_TRACER_ID, TracerContextUtil.getCurrentTracer().getTraceId()); - headers.add(JCPP_TRACER_ORIGIN, TracerContextUtil.getCurrentTracer().getOrigin()); - headers.add(JCPP_TRACER_TS, String.valueOf(TracerContextUtil.getCurrentTracer().getTracerTs())); - headers.setContentType(MediaType.parseMediaType("application/x-protobuf")); - - HttpEntity entity = new HttpEntity<>(DownlinkRequestMessage, headers); - - try { - ResponseEntity response = downlinkRestTemplate.postForEntity("http://" + nodeWebapiIpPort + "/api/onDownlink", - entity, ResponseEntity.class); - log.debug("下行消息发送成功 {}", response); - } catch (RestClientException e) { - log.error("下行消息发送失败 {}", DownlinkRequestMessage, e); - throw new RuntimeException(e); - } - - } -} \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java index f7ccab0..d6207f3 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java @@ -67,7 +67,9 @@ public class DefaultPileProtocolService implements PileProtocolService { cacheSession(uplinkQueueMessage, pile, loginRequest.getRemoteAddress(), loginRequest.getNodeId(), - loginRequest.getNodeWebapiIpPort()); + loginRequest.getNodeHostAddress(), + loginRequest.getNodeRestPort(), + loginRequest.getNodeGrpcPort()); downlinkMessageBuilder.setLoginResponse(LoginResponse.newBuilder() .setSuccess(true) @@ -98,16 +100,26 @@ public class DefaultPileProtocolService implements PileProtocolService { cacheSession(uplinkQueueMessage, pile, heartBeatRequest.getRemoteAddress(), heartBeatRequest.getNodeId(), - heartBeatRequest.getNodeWebapiIpPort()); + heartBeatRequest.getNodeHostAddress(), + heartBeatRequest.getNodeRestPort(), + heartBeatRequest.getNodeGrpcPort()); } } - private void cacheSession(UplinkQueueMessage uplinkQueueMessage, Pile pile, String remoteAddress, String nodeId, String nodeWebapiIpPort) { + private void cacheSession(UplinkQueueMessage uplinkQueueMessage, + Pile pile, + String remoteAddress, + String nodeId, + String nodeIp, + int restPort, + int grpcPort) { PileSession pileSession = new PileSession(pile.getId(), pile.getPileCode(), uplinkQueueMessage.getProtocolName()); pileSession.setProtocolSessionId(new UUID(uplinkQueueMessage.getSessionIdMSB(), uplinkQueueMessage.getSessionIdLSB())); pileSession.setRemoteAddress(remoteAddress); pileSession.setNodeId(nodeId); - pileSession.setNodeWebapiIpPort(nodeWebapiIpPort); + pileSession.setNodeIp(nodeIp); + pileSession.setNodeRestPort(restPort); + pileSession.setNodeGrpcPort(grpcPort); pileSessionCache.put(new PileSessionCacheKey(pile.getPileCode()), pileSession); } diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/GrpcDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/GrpcDownlinkCallService.java new file mode 100644 index 0000000..59c6e90 --- /dev/null +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/GrpcDownlinkCallService.java @@ -0,0 +1,39 @@ +/** + * 抖音关注:程序员三丙 + * 知识星球:https://t.zsxq.com/j9b21 + */ +package sanbing.jcpp.app.service.impl; + +import com.google.common.net.HostAndPort; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Service; +import sanbing.jcpp.app.data.PileSession; +import sanbing.jcpp.app.service.DownlinkCallService; +import sanbing.jcpp.app.service.grpc.DownlinkGrpcClient; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; + +/** + * @author baigod + */ +@Service +@Slf4j +@ConditionalOnExpression("'${downlink.rpc.type:null}'=='grpc'") +public class GrpcDownlinkCallService extends DownlinkCallService { + + @Resource + DownlinkGrpcClient downlinkGrpcClient; + + @Override + protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession) { + try { + + downlinkGrpcClient.sendDownlinkRequest(HostAndPort.fromParts(pileSession.getNodeIp(), pileSession.getNodeGrpcPort()), + downlinkMessage); + + } catch (Exception e) { + log.error("下行消息发送异常", e); + } + } +} \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/RestDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/RestDownlinkCallService.java new file mode 100644 index 0000000..795c949 --- /dev/null +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/RestDownlinkCallService.java @@ -0,0 +1,64 @@ +/** + * 抖音关注:程序员三丙 + * 知识星球:https://t.zsxq.com/j9b21 + */ +package sanbing.jcpp.app.service.impl; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; +import sanbing.jcpp.app.data.PileSession; +import sanbing.jcpp.app.service.DownlinkCallService; +import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; + +import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*; + +/** + * @author baigod + */ +@Service +@Slf4j +@ConditionalOnExpression("'${downlink.rpc.type:null}'=='rest'") +public class RestDownlinkCallService extends DownlinkCallService { + + @Resource + RestTemplate downlinkRestTemplate; + + @Override + protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession) { + try { + + invokeDownlinkRestApi(downlinkMessage, pileSession.getNodeIp(), pileSession.getNodeRestPort()); + + } catch (RestClientException e) { + log.error("下行消息发送异常", e); + } + } + + private void invokeDownlinkRestApi(DownlinkRequestMessage downlinkRequestMessage, String nodeWebapiIpPort, int nodeRestPort) { + HttpHeaders headers = new HttpHeaders(); + headers.add(JCPP_TRACER_ID, TracerContextUtil.getCurrentTracer().getTraceId()); + headers.add(JCPP_TRACER_ORIGIN, TracerContextUtil.getCurrentTracer().getOrigin()); + headers.add(JCPP_TRACER_TS, String.valueOf(TracerContextUtil.getCurrentTracer().getTracerTs())); + headers.setContentType(MediaType.parseMediaType("application/x-protobuf")); + + HttpEntity entity = new HttpEntity<>(downlinkRequestMessage, headers); + + try { + ResponseEntity response = downlinkRestTemplate.postForEntity("http://" + nodeWebapiIpPort + ":" + nodeRestPort + "/api/onDownlink", + entity, ResponseEntity.class); + log.debug("下行消息发送成功 {}", response); + } catch (RestClientException e) { + log.error("下行消息发送失败 {}", downlinkRequestMessage, e); + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/jcpp-infrastructure-proto/pom.xml b/jcpp-infrastructure-proto/pom.xml index 82e15a5..b625d3a 100644 --- a/jcpp-infrastructure-proto/pom.xml +++ b/jcpp-infrastructure-proto/pom.xml @@ -41,17 +41,14 @@ io.grpc grpc-netty-shaded - provided io.grpc grpc-protobuf - provided io.grpc grpc-stub - provided diff --git a/jcpp-infrastructure-proto/src/main/proto/protocol.proto b/jcpp-infrastructure-proto/src/main/proto/protocol.proto index 2ad68f5..a34aa90 100644 --- a/jcpp-infrastructure-proto/src/main/proto/protocol.proto +++ b/jcpp-infrastructure-proto/src/main/proto/protocol.proto @@ -13,6 +13,12 @@ service ProtocolDownlinkInterface { rpc onDownlink(stream DownlinkRequestMessage) returns (stream DownlinkResponseMessage) {} } +message TracerProto { + string id = 1; + string origin = 2; + int64 ts = 3; +} + message UplinkQueueMessage { int64 messageIdMSB = 1; int64 messageIdLSB = 2; @@ -43,14 +49,15 @@ message DownlinkRequestMessage { optional int64 requestIdMSB = 8; optional int64 requestIdLSB = 9; optional bytes requestData = 10; - string downlinkCmd = 11; - LoginResponse loginResponse = 20; - VerifyPricingResponse verifyPricingResponse = 21; - QueryPricingResponse queryPricingResponse = 22; - SetPricingRequest setPricingRequest = 23; - RemoteStartChargingRequest remoteStartChargingRequest = 24; - RemoteStopChargingRequest remoteStopChargingRequest = 25; - TransactionRecordAck transactionRecordAck = 26; + TracerProto tracer = 12; + string downlinkCmd = 20; + LoginResponse loginResponse = 21; + VerifyPricingResponse verifyPricingResponse = 22; + QueryPricingResponse queryPricingResponse = 23; + SetPricingRequest setPricingRequest = 24; + RemoteStartChargingRequest remoteStartChargingRequest = 25; + RemoteStopChargingRequest remoteStopChargingRequest = 26; + TransactionRecordAck transactionRecordAck = 27; } message DownlinkResponseMessage { @@ -63,7 +70,9 @@ message LoginRequest { string credential = 3; string remoteAddress = 4; string nodeId = 10; - string nodeWebapiIpPort = 11; + string nodeHostAddress = 11; + int32 nodeRestPort = 12; + int32 nodeGrpcPort = 13; optional string additionalInfo = 20; } @@ -76,7 +85,9 @@ message HeartBeatRequest { string pileCode = 3; string remoteAddress = 4; string nodeId = 10; - string nodeWebapiIpPort = 11; + string nodeHostAddress = 11; + int32 nodeRestPort = 12; + int32 nodeGrpcPort = 13; optional string additionalInfo = 20; } diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/ProtoQueueMsg.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/ProtoQueueMsg.java index 3667dfc..da7d47d 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/ProtoQueueMsg.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/ProtoQueueMsg.java @@ -4,10 +4,11 @@ */ package sanbing.jcpp.infrastructure.queue; +import com.google.protobuf.GeneratedMessage; import lombok.Data; @Data -public class ProtoQueueMsg implements QueueMsg { +public class ProtoQueueMsg implements QueueMsg { private final String key; protected final T value; diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/DefaultServiceInfoProvider.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/DefaultServiceInfoProvider.java index 055610f..2c8a1f9 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/DefaultServiceInfoProvider.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/DefaultServiceInfoProvider.java @@ -19,7 +19,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; /** @@ -42,15 +41,18 @@ public class DefaultServiceInfoProvider implements ServiceInfoProvider { private ServiceInfo serviceInfo; @Getter - private String serviceWebapiEndpoint; + private String hostAddress; + @Getter @Value("${server.port}") - private String webapiPort; + private int restPort; + + @Getter + @Value("${service.protocol.rpc.port:9090}") + private int grpcPort; @PostConstruct public void init() throws UnknownHostException { - - if (!StringUtils.hasText(this.serviceId)) { try { this.serviceId = InetAddress.getLocalHost().getHostName(); @@ -58,10 +60,11 @@ public class DefaultServiceInfoProvider implements ServiceInfoProvider { this.serviceId = RandomStringUtils.randomAlphabetic(10); } } - log.info("Current Service ID: {}", this.serviceId); + log.info("Current Service ID: {}", serviceId); - serviceWebapiEndpoint = InetAddress.getLocalHost().getHostAddress() + ":" + webapiPort; - log.info("Current Service HostAddress: {}", this.serviceWebapiEndpoint); + hostAddress = InetAddress.getLocalHost().getHostAddress(); + + log.info("Current Service HostAddress: {}, RestPort:{}, GrpcPort:{}", hostAddress, restPort, grpcPort); if (serviceType.equalsIgnoreCase("monolith")) { serviceTypes = List.of(ServiceType.values()); } else { @@ -86,7 +89,7 @@ public class DefaultServiceInfoProvider implements ServiceInfoProvider { public ServiceInfo generateNewServiceInfoWithCurrentSystemInfo() { ServiceInfo.Builder builder = ServiceInfo.newBuilder() .setServiceId(serviceId) - .addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).collect(Collectors.toList())) + .addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).toList()) .setSystemInfo(getCurrentSystemInfoProto()); return serviceInfo = builder.build(); } diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/ServiceInfoProvider.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/ServiceInfoProvider.java index 294e2a6..dc22196 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/ServiceInfoProvider.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/ServiceInfoProvider.java @@ -13,7 +13,11 @@ import sanbing.jcpp.proto.gen.ClusterProto; public interface ServiceInfoProvider { String getServiceId(); - String getServiceWebapiEndpoint(); + String getHostAddress(); + + int getRestPort(); + + int getGrpcPort(); String getServiceType(); diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/processing/IdMsgPair.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/processing/IdMsgPair.java index 8a6eec5..a2ea23f 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/processing/IdMsgPair.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/processing/IdMsgPair.java @@ -4,12 +4,13 @@ */ package sanbing.jcpp.infrastructure.queue.processing; +import com.google.protobuf.GeneratedMessage; import lombok.Getter; import sanbing.jcpp.infrastructure.queue.ProtoQueueMsg; import java.util.UUID; -public class IdMsgPair { +public class IdMsgPair { @Getter final UUID uuid; @Getter diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkGrpcService.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkGrpcService.java new file mode 100644 index 0000000..82107d5 --- /dev/null +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkGrpcService.java @@ -0,0 +1,159 @@ +/** + * 抖音关注:程序员三丙 + * 知识星球:https://t.zsxq.com/j9b21 + */ +package sanbing.jcpp.protocol.adapter; + +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; +import io.grpc.Server; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.channel.ChannelOption; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; +import io.grpc.stub.StreamObserver; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Service; +import sanbing.jcpp.infrastructure.util.mdc.MDCUtils; +import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; +import sanbing.jcpp.infrastructure.util.trace.TracerRunnable; +import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc.ProtocolDownlinkInterfaceImplBase; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkResponseMessage; +import sanbing.jcpp.proto.gen.ProtocolProto.TracerProto; +import sanbing.jcpp.protocol.domain.ProtocolSession; +import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider; + +import javax.annotation.PreDestroy; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static sanbing.jcpp.infrastructure.util.config.ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL; + +/** + * @author baigod + */ +@Service +@Slf4j +@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='protocol'") +public class DownlinkGrpcService extends ProtocolDownlinkInterfaceImplBase { + @Value("${service.protocol.rpc.port}") + private int rpcPort; + @Value("${service.protocol.rpc.boss}") + private int rpcBoss; + @Value("${service.protocol.rpc.worker}") + private int rpcWorker; + @Value("${service.protocol.rpc.so-rcvbuf}") + private int rpcNettySoRcvbuf; + @Value("${service.protocol.rpc.so-sndbuf}") + private int rpcNettySoSndbuf; + @Value("${service.protocol.rpc.no-delay}") + private boolean rpcNettyNoDelay; + @Value("${service.protocol.rpc.max-inbound-message-size}") + private int maxInboundMessageSize; + @Value("${service.protocol.rpc.max-concurrent-calls-per-connection}") + private int maxConcurrentCallsPerConnection; + @Value("${service.protocol.rpc.client-max-keep-alive-time-sec}") + private int clientMaxKeepAliveTimeSec; + + @Resource + ProtocolSessionRegistryProvider protocolSessionRegistryProvider; + + private Server server; + + @PostConstruct + public void init() throws Exception { + log.info("Initializing Protocol Downlink Grpc service!"); + + NettyServerBuilder builder = NettyServerBuilder.forPort(this.rpcPort) + .bossEventLoopGroup(new NioEventLoopGroup(this.rpcBoss)) + .workerEventLoopGroup(new NioEventLoopGroup(this.rpcWorker)) + .withOption(ChannelOption.SO_RCVBUF, rpcNettySoRcvbuf) + .withChildOption(ChannelOption.SO_SNDBUF, rpcNettySoSndbuf) + .withChildOption(ChannelOption.TCP_NODELAY, rpcNettyNoDelay) + .compressorRegistry(CompressorRegistry.getDefaultInstance()) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) + .channelType(NioServerSocketChannel.class) + .permitKeepAliveTime(this.clientMaxKeepAliveTimeSec, TimeUnit.SECONDS) + .maxInboundMessageSize(maxInboundMessageSize) + .maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection) + .directExecutor() + .keepAliveTime(5, TimeUnit.MINUTES) + .keepAliveTimeout(10, TimeUnit.SECONDS) + .permitKeepAliveWithoutCalls(true) + .addService(this); + + this.server = builder.build(); + log.info("Going to start RPC server using port: {}", this.rpcPort); + + try { + this.server.start(); + } catch (Exception e) { + log.error("Failed to start RPC server!", e); + throw e; + } + + log.info("Protocol Downlink Grpc service initialized!"); + } + + @PreDestroy + public void destroy() { + if (this.server != null) { + this.server.shutdownNow(); + } + } + + @Override + public StreamObserver onDownlink(StreamObserver responseObserver) { + return new StreamObserver<>() { + @Override + public void onNext(DownlinkRequestMessage downlinkMsg) { + TracerProto tracerProto = downlinkMsg.getTracer(); + TracerContextUtil.newTracer(tracerProto.getId(), tracerProto.getOrigin(), tracerProto.getTs()); + MDCUtils.recordTracer(); + + log.debug("收到Grpc下行请求 {}", downlinkMsg); + + JCPP_COMMON_THREAD_POOL.execute(new TracerRunnable(() -> { + UUID protocolSessionId = new UUID(downlinkMsg.getSessionIdMSB(), downlinkMsg.getSessionIdLSB()); + + ProtocolSession protocolSession = protocolSessionRegistryProvider.get(protocolSessionId); + + try { + if (protocolSession != null) { + + protocolSession.onDownlink(downlinkMsg); + + } else { + + log.info("下发报文时Session未找到 sessionId: {}", protocolSessionId); + + } + } catch (Exception e) { + + log.warn("下发报文时处理失败 sessionId: {}", protocolSessionId, e); + + } + })); + } + + @Override + public void onError(Throwable t) { + log.error("Failed to deliver message from client!", t); + } + + @Override + public void onCompleted() { + try { + responseObserver.onCompleted(); + } catch (Exception e) { + log.error("onCompleted error ", e); + } + } + }; + } +} \ No newline at end of file diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/impl/DefaultProtocolSessionRegistryProvider.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/impl/DefaultProtocolSessionRegistryProvider.java index ae823b0..65df293 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/impl/DefaultProtocolSessionRegistryProvider.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/provider/impl/DefaultProtocolSessionRegistryProvider.java @@ -34,10 +34,10 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe private static final int INIT_CACHE_LIMIT = 100_000; private static final int MAXIMUM_SIZE = 1_000_000; - @Value("${service.protocols.sessions.default-inactivity-timeout-in-sec}") + @Value("${service.protocol.sessions.default-inactivity-timeout-in-sec}") private int defaultInactivityTimeoutInSec; - @Value("${service.protocols.sessions.default-state-check-interval-in-sec}") + @Value("${service.protocol.sessions.default-state-check-interval-in-sec}") private int defaultStateCheckIntervalInSec; @Getter diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index f01a893..7d14108 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -40,10 +40,23 @@ service: type: "${SERVICE_TYPE:protocol}" # 可自定义的服务ID,如果不指定,则默认为HOSTNAME id: "${SERVICE_ID:}" - protocols: + protocol: sessions: default-inactivity-timeout-in-sec: "${PROTOCOLS_SESSIONS_DEFAULT_INACTIVITY_TIMEOUT_IN_SEC:600}" default-state-check-interval-in-sec: "${PROTOCOLS_SESSIONS_DEFAULT_STATE_CHECK_INTERVAL_IN_SEC:60}" + rpc: + enabled: "${SERVICE_PROTOCOL_RPC_ENABLED:true}" + port: "${SERVICE_PROTOCOL_RPC_PORT:9090}" + boss: "${SERVICE_PROTOCOL_RPC_BOSS:4}" + worker: "${SERVICE_PROTOCOL_RPC_WORKER:64}" + so-rcvbuf: "${SERVICE_PROTOCOL_RPC_SO_RCVBUF:65535}" + so-sndbuf: "${SERVICE_PROTOCOL_RPC_SO_SNDBUF:65535}" + no-delay: "${SERVICE_PROTOCOL_RPC_NO_DELAY:true}" + user-thread-pool-size: "${SERVICE_PROTOCOL_RPC_USER_THREAD_POOL_SIZE:1024}" + max-inbound-message-size: "${SERVICE_PROTOCOL_RPC_MAX_INBOUND_MESSAGE_SIZE:33554432}" + max-concurrent-calls-per-connection: "${SERVICE_PROTOCOL_MAX_CONCURRENT_CALLS_PER_CONNECTION:4}" + client-max-keep-alive-time-sec: "${SERVICE_PROTOCOL_RPC_CLIENT_MAX_KEEP_ALIVE_TIME_SEC:30}" + protocols: yunkuaichongV150: enabled: "${PROTOCOLS_YUNKUAICHONGV150_ENABLED:true}" listener: @@ -184,3 +197,4 @@ thread-pool: hash_function_name: "${THREAD_POOL_SHARDING_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 parallelism: "${THREAD_POOL_SHARDING_PARALLELISM:8}" stats-print-interval-ms: "${THREAD_POOL_SHARDING_STATS_PRINT_INTERVAL_MS:10000}" + diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150HeartbeatULCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150HeartbeatULCmd.java index 40fc271..183c3fb 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150HeartbeatULCmd.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150HeartbeatULCmd.java @@ -54,7 +54,9 @@ public class YunKuaiChongV150HeartbeatULCmd extends YunKuaiChongUplinkCmdExe { .setPileCode(pileCode) .setRemoteAddress(tcpSession.getAddress().toString()) .setNodeId(ctx.getServiceInfoProvider().getServiceId()) - .setNodeWebapiIpPort(ctx.getServiceInfoProvider().getServiceWebapiEndpoint()) + .setNodeHostAddress(ctx.getServiceInfoProvider().getHostAddress()) + .setNodeRestPort(ctx.getServiceInfoProvider().getRestPort()) + .setNodeGrpcPort(ctx.getServiceInfoProvider().getGrpcPort()) .setAdditionalInfo(additionalInfo.toString()) .build(); UplinkQueueMessage uplinkQueueMessage = uplinkMessageBuilder(heartBeatRequest.getPileCode(), tcpSession, yunKuaiChongUplinkMessage) diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginULCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginULCmd.java index 99fe876..e311860 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginULCmd.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginULCmd.java @@ -66,7 +66,9 @@ public class YunKuaiChongV150LoginULCmd extends YunKuaiChongUplinkCmdExe { .setCredential(pileCode) .setRemoteAddress(tcpSession.getAddress().toString()) .setNodeId(ctx.getServiceInfoProvider().getServiceId()) - .setNodeWebapiIpPort(ctx.getServiceInfoProvider().getServiceWebapiEndpoint()) + .setNodeHostAddress(ctx.getServiceInfoProvider().getHostAddress()) + .setNodeRestPort(ctx.getServiceInfoProvider().getRestPort()) + .setNodeGrpcPort(ctx.getServiceInfoProvider().getGrpcPort()) .setAdditionalInfo(additionalInfo.toString()) .build(); UplinkQueueMessage uplinkQueueMessage = uplinkMessageBuilder(loginRequest.getPileCode(), tcpSession, yunKuaiChongUplinkMessage) diff --git a/pom.xml b/pom.xml index 3a608a7..3fcf2fd 100644 --- a/pom.xml +++ b/pom.xml @@ -40,10 +40,10 @@ 1.7.0 3.4.4 - 3.21.9 4.0.2 - 1.56.1 - 0.5.1 + 4.28.2 + 1.68.0 + 0.6.1 3.13.0 5.8.32 3.5.7