grpc 实现

This commit is contained in:
三丙
2024-10-23 17:07:57 +08:00
parent 7445d4e3f0
commit 5a1b4f8303
21 changed files with 715 additions and 141 deletions

View File

@@ -29,7 +29,11 @@ public class PileSession implements Serializable {
private String nodeId;
private String nodeWebapiIpPort;
private String nodeIp;
private int nodeRestPort;
private int nodeGrpcPort;
public PileSession(UUID pileId, String pileCode, String protocolName) {
this.pileId = pileId;
@@ -45,13 +49,17 @@ public class PileSession implements Serializable {
@JsonProperty("protocolSessionId") UUID protocolSessionId,
@JsonProperty("remoteAddress") String remoteAddress,
@JsonProperty("nodeId") String nodeId,
@JsonProperty("nodeWebapiIpPort") String nodeWebapiIpPort) {
@JsonProperty("nodeIp") String nodeIp,
@JsonProperty("nodeRestPort") int nodeRestPort,
@JsonProperty("nodeGrpcPort") int nodeGrpcPort) {
this.pileId = pileId;
this.pileCode = pileCode;
this.protocolName = protocolName;
this.protocolSessionId = protocolSessionId;
this.remoteAddress = remoteAddress;
this.nodeId = nodeId;
this.nodeWebapiIpPort = nodeWebapiIpPort;
this.nodeIp = nodeIp;
this.nodeRestPort = nodeRestPort;
this.nodeGrpcPort = nodeGrpcPort;
}
}

View File

@@ -4,12 +4,68 @@
*/
package sanbing.jcpp.app.service;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import sanbing.jcpp.app.data.PileSession;
import sanbing.jcpp.app.service.cache.session.PileSessionCacheKey;
import sanbing.jcpp.infrastructure.cache.CacheValueWrapper;
import sanbing.jcpp.infrastructure.cache.TransactionalCache;
import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider;
import sanbing.jcpp.infrastructure.util.trace.Tracer;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.adapter.DownlinkController;
/**
* @author baigod
*/
public interface DownlinkCallService {
@Slf4j
public abstract class DownlinkCallService {
void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode);
@Resource
protected ServiceInfoProvider serviceInfoProvider;
@Resource
protected DownlinkController downlinkController;
@Resource
protected TransactionalCache<PileSessionCacheKey, PileSession> pileSessionCache;
@Value("${cache.type}")
protected String cacheType;
public void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode) {
CacheValueWrapper<PileSession> pileSessionCacheValueWrapper = pileSessionCache.get(new PileSessionCacheKey(pileCode));
if (pileSessionCacheValueWrapper == null) {
log.warn("充电桩会话不存在 {}", pileCode);
return;
}
PileSession pileSession = pileSessionCacheValueWrapper.get();
if (serviceInfoProvider.isMonolith() &&
("caffeine".equalsIgnoreCase(cacheType)) || serviceInfoProvider.getServiceId().equalsIgnoreCase(pileSession.getNodeId())) {
downlinkController.onDownlink(downlinkMessageBuilder.build())
.setResultHandler(result -> log.debug("下行消息发送完成"));
} else {
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
downlinkMessageBuilder.setTracer(ProtocolProto.TracerProto.newBuilder()
.setId(currentTracer.getTraceId())
.setOrigin(currentTracer.getOrigin())
.setTs(currentTracer.getTracerTs())
.build());
_sendDownlinkMessage(downlinkMessageBuilder.build(), pileSession);
}
}
protected abstract void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession);
}

View File

@@ -0,0 +1,280 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.grpc;
import com.google.common.net.HostAndPort;
import io.grpc.CompressorRegistry;
import io.grpc.ConnectivityState;
import io.grpc.DecompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import io.grpc.stub.StreamObserver;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory;
import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc;
import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc.ProtocolDownlinkInterfaceStub;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkResponseMessage;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author baigod
*/
@Component
@Slf4j
public class DownlinkGrpcClient {
@Value("${downlink.rpc.grpc.netty.event_loop:}")
private Integer rpcNettyEventLoop;
@Value("${downlink.rpc.grpc.netty.so_sndbuf:65535}")
private Integer rpcNettySoSndbuf;
@Value("${downlink.rpc.grpc.netty.so_rcvbuf:65535}")
private Integer rpcNettySoRcvbuf;
@Value("${downlink.rpc.grpc.netty.no_delay:true}")
private boolean rpcNoDelay;
@Value("${downlink.rpc.grpc.netty.max_inbound_message_size:33554432}")
private Integer rpcMaxInboundMessageSize;
@Value("${downlink.rpc.grpc.keep_alive_time_sec:300}")
private int keepAliveTimeSec;
@Value("${downlink.rpc.grpc.max_records_size:102400}")
private int maxRecordsSize;
@Value("${downlink.rpc.grpc.batch_records_count:1024}")
private int batchRecordsCount;
@Value("${downlink.rpc.grpc.no_read_records_sleep:25}")
private long noRecordsSleepInterval;
@Value("${downlink.rpc.grpc.records_ttl:600000}")
private long recordsTtl;
private volatile boolean initialized = true;
private final Map<HostAndPort, ManagedChannel> channelMap = new ConcurrentHashMap<>();
private final Map<HostAndPort, StreamObserver<DownlinkRequestMessage>> inputStreamMap = new ConcurrentHashMap<>();
private final Map<HostAndPort, LinkedBlockingQueue<DownlinkRequestMessage>> queueMap = new ConcurrentHashMap<>();
private final Map<HostAndPort, ReentrantLock> msgHandleLocks = new ConcurrentHashMap<>();
private final Map<HostAndPort, ExecutorService> msgHandleExecutors = new ConcurrentHashMap<>();
private ExecutorService grpcStarterExecutor;
private ScheduledExecutorService grpcStateCheckExecutor;
private ScheduledExecutorService downlinkMsgsExecutor;
@PostConstruct
public void init() {
grpcStarterExecutor = Executors.newSingleThreadExecutor(JCPPThreadFactory.forName("grpc-starter-executor"));
grpcStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("grpc-check-executor"));
downlinkMsgsExecutor = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("downlink-msgs-executor"));
// 每秒进行一次连接检查与线程初始化
downlinkMsgsExecutor.scheduleWithFixedDelay(() -> {
queueMap.forEach((key, queue) -> {
if (queue.isEmpty()) {
return;
}
ManagedChannel managedChannel = channelMap.get(key);
if (managedChannel == null || managedChannel.getState(false) != ConnectivityState.READY) {
grpcStarterExecutor.submit(new TracerRunnable(() -> connect(key)));
return;
}
msgHandleExecutors.computeIfAbsent(key, hostAndPort ->
Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("downlink-handle-threads-" + hostAndPort)))
.execute(new TracerRunnable(() -> {
while (initialized) {
try {
handleMsgs(key, queue);
} catch (Exception e) {
log.error("Failed to process messages handling!", e);
}
}
}));
});
}, 0, 1, TimeUnit.SECONDS);
grpcStateCheckExecutor.scheduleWithFixedDelay(() -> {
channelMap.forEach((key, channel) -> {
ConnectivityState state = channel.getState(true);
if (state == ConnectivityState.SHUTDOWN) {
log.info("Grpc 客户端SHUTDOWN {} {}", key, state);
LinkedBlockingQueue<DownlinkRequestMessage> queue = queueMap.get(key);
if (queue != null) {
queue.clear();
queueMap.remove(key);
}
ExecutorService executorService = msgHandleExecutors.get(key);
if (executorService != null) {
executorService.shutdownNow();
msgHandleExecutors.remove(key);
}
}
});
}, 0, 1, TimeUnit.SECONDS);
}
private void handleMsgs(HostAndPort key, LinkedBlockingQueue<DownlinkRequestMessage> queue) {
StreamObserver<DownlinkRequestMessage> inputStream = inputStreamMap.get(key);
if (inputStream == null) {
return;
}
long acceptTs = System.currentTimeMillis() - recordsTtl;
List<DownlinkRequestMessage> downlinkMsgs = new ArrayList<>(batchRecordsCount);
queue.drainTo(downlinkMsgs, batchRecordsCount);
for (DownlinkRequestMessage msg : downlinkMsgs) {
long ts = msg.getTracer().getTs();
if (ts > 0 && ts < acceptTs) {
log.warn("[{}] 消息过期,直接丢弃 {}", key, ts);
continue;
}
ReentrantLock lock = msgHandleLocks.computeIfAbsent(key, hostAndPort -> new ReentrantLock());
lock.lock();
try {
inputStream.onNext(msg);
} finally {
lock.unlock();
}
}
if (downlinkMsgs.isEmpty()) {
try {
Thread.sleep(noRecordsSleepInterval);
} catch (InterruptedException e) {
log.warn("Sleep interrupted!", e);
}
} else {
downlinkMsgs.clear();
}
}
@PreDestroy
public void destroy() {
log.info("Starting Grpc destroying process");
initialized = false;
try {
grpcStarterExecutor.shutdownNow();
grpcStateCheckExecutor.shutdownNow();
downlinkMsgsExecutor.shutdownNow();
msgHandleExecutors.values().forEach(ExecutorService::shutdownNow);
inputStreamMap.values().forEach(StreamObserver::onCompleted);
channelMap.values().forEach(ManagedChannel::shutdownNow);
} catch (Exception e) {
log.error("Exception during disconnect", e);
}
}
public void connect(HostAndPort hostAndPort) {
if (channelMap.get(hostAndPort) != null && channelMap.get(hostAndPort).getState(true).ordinal() < 2) {
return;
}
log.info("[{}] Create new Grpc Client Channel!", hostAndPort);
ManagedChannel managedChannel = NettyChannelBuilder.forAddress(hostAndPort.getHost(), hostAndPort.getPort())
.eventLoopGroup(new NioEventLoopGroup(Optional.ofNullable(rpcNettyEventLoop).orElse(Runtime.getRuntime().availableProcessors() * 2)))
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.withOption(ChannelOption.SO_SNDBUF, rpcNettySoSndbuf)
.withOption(ChannelOption.SO_RCVBUF, rpcNettySoRcvbuf)
.withOption(ChannelOption.TCP_NODELAY, rpcNoDelay)
.maxInboundMessageSize(rpcMaxInboundMessageSize)
.channelType(NioSocketChannel.class)
.directExecutor()
.keepAliveTime(keepAliveTimeSec, TimeUnit.SECONDS)
.usePlaintext()
.keepAliveTime(5, TimeUnit.MINUTES) // Change to a larger value, e.g. 5min.
.keepAliveTimeout(10, TimeUnit.SECONDS) // Change to a larger value, e.g. 10s.
.keepAliveWithoutCalls(true)// You should normally avoid enabling this.
.defaultLoadBalancingPolicy("round_robin")
.build();
log.info("Grpc 客户端READY {} {}", hostAndPort, managedChannel.getState(true));
ManagedChannel remove = channelMap.remove(hostAndPort);
if (remove != null) {
channelMap.get(hostAndPort).shutdownNow();
}
channelMap.put(hostAndPort, managedChannel);
ProtocolDownlinkInterfaceStub stub = ProtocolDownlinkInterfaceGrpc.newStub(managedChannel);
StreamObserver<DownlinkRequestMessage> streamObserver = stub.onDownlink(new StreamObserver<>() {
@Override
public void onNext(DownlinkResponseMessage value) {
log.info("Grpc 接收到通信层反向回复 {}", value);
}
@Override
public void onError(Throwable t) {
log.warn("Grpc 客户端异常 {}", t.getMessage());
}
@Override
public void onCompleted() {
log.info("[{}] The Grpc connection was closed!", hostAndPort);
}
});
inputStreamMap.put(hostAndPort, streamObserver);
}
/**
* 发送下行请求
*/
public void sendDownlinkRequest(HostAndPort hostAndPort, DownlinkRequestMessage downlinkRequestMessage) {
queueMap.computeIfAbsent(hostAndPort, k -> new LinkedBlockingQueue<>(maxRecordsSize)).add(downlinkRequestMessage);
}
}

View File

@@ -1,97 +0,0 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.impl;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import sanbing.jcpp.app.data.PileSession;
import sanbing.jcpp.app.service.DownlinkCallService;
import sanbing.jcpp.app.service.cache.session.PileSessionCacheKey;
import sanbing.jcpp.infrastructure.cache.CacheValueWrapper;
import sanbing.jcpp.infrastructure.cache.TransactionalCache;
import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import sanbing.jcpp.protocol.adapter.DownlinkController;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*;
/**
* @author baigod
*/
@Service
@Slf4j
public class DefaultDownlinkCallService implements DownlinkCallService {
@Resource
RestTemplate downlinkRestTemplate;
@Resource
ServiceInfoProvider serviceInfoProvider;
@Resource
DownlinkController downlinkController;
@Resource
TransactionalCache<PileSessionCacheKey, PileSession> pileSessionCache;
@Value("${cache.type}")
private String cacheType;
@Override
public void sendDownlinkMessage(DownlinkRequestMessage.Builder downlinkMessageBuilder, String pileCode) {
if (serviceInfoProvider.isMonolith() && "caffeine".equalsIgnoreCase(cacheType)) {
downlinkController.onDownlink(downlinkMessageBuilder.build())
.setResultHandler(result -> log.debug("下行消息发送完成"));
} else {
try {
CacheValueWrapper<PileSession> pileSessionCacheValueWrapper = pileSessionCache.get(new PileSessionCacheKey(pileCode));
if (pileSessionCacheValueWrapper == null) {
log.warn("充电桩会话不存在 {}", pileCode);
return;
}
PileSession pileSession = pileSessionCacheValueWrapper.get();
invokeDownlinkRestApi(downlinkMessageBuilder.build(), pileSession.getNodeWebapiIpPort());
} catch (RestClientException e) {
log.error("下行消息发送异常", e);
}
}
}
private void invokeDownlinkRestApi(DownlinkRequestMessage DownlinkRequestMessage, String nodeWebapiIpPort) {
HttpHeaders headers = new HttpHeaders();
headers.add(JCPP_TRACER_ID, TracerContextUtil.getCurrentTracer().getTraceId());
headers.add(JCPP_TRACER_ORIGIN, TracerContextUtil.getCurrentTracer().getOrigin());
headers.add(JCPP_TRACER_TS, String.valueOf(TracerContextUtil.getCurrentTracer().getTracerTs()));
headers.setContentType(MediaType.parseMediaType("application/x-protobuf"));
HttpEntity<DownlinkRequestMessage> entity = new HttpEntity<>(DownlinkRequestMessage, headers);
try {
ResponseEntity<?> response = downlinkRestTemplate.postForEntity("http://" + nodeWebapiIpPort + "/api/onDownlink",
entity, ResponseEntity.class);
log.debug("下行消息发送成功 {}", response);
} catch (RestClientException e) {
log.error("下行消息发送失败 {}", DownlinkRequestMessage, e);
throw new RuntimeException(e);
}
}
}

View File

@@ -67,7 +67,9 @@ public class DefaultPileProtocolService implements PileProtocolService {
cacheSession(uplinkQueueMessage, pile,
loginRequest.getRemoteAddress(),
loginRequest.getNodeId(),
loginRequest.getNodeWebapiIpPort());
loginRequest.getNodeHostAddress(),
loginRequest.getNodeRestPort(),
loginRequest.getNodeGrpcPort());
downlinkMessageBuilder.setLoginResponse(LoginResponse.newBuilder()
.setSuccess(true)
@@ -98,16 +100,26 @@ public class DefaultPileProtocolService implements PileProtocolService {
cacheSession(uplinkQueueMessage, pile,
heartBeatRequest.getRemoteAddress(),
heartBeatRequest.getNodeId(),
heartBeatRequest.getNodeWebapiIpPort());
heartBeatRequest.getNodeHostAddress(),
heartBeatRequest.getNodeRestPort(),
heartBeatRequest.getNodeGrpcPort());
}
}
private void cacheSession(UplinkQueueMessage uplinkQueueMessage, Pile pile, String remoteAddress, String nodeId, String nodeWebapiIpPort) {
private void cacheSession(UplinkQueueMessage uplinkQueueMessage,
Pile pile,
String remoteAddress,
String nodeId,
String nodeIp,
int restPort,
int grpcPort) {
PileSession pileSession = new PileSession(pile.getId(), pile.getPileCode(), uplinkQueueMessage.getProtocolName());
pileSession.setProtocolSessionId(new UUID(uplinkQueueMessage.getSessionIdMSB(), uplinkQueueMessage.getSessionIdLSB()));
pileSession.setRemoteAddress(remoteAddress);
pileSession.setNodeId(nodeId);
pileSession.setNodeWebapiIpPort(nodeWebapiIpPort);
pileSession.setNodeIp(nodeIp);
pileSession.setNodeRestPort(restPort);
pileSession.setNodeGrpcPort(grpcPort);
pileSessionCache.put(new PileSessionCacheKey(pile.getPileCode()), pileSession);
}

View File

@@ -0,0 +1,39 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.impl;
import com.google.common.net.HostAndPort;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import sanbing.jcpp.app.data.PileSession;
import sanbing.jcpp.app.service.DownlinkCallService;
import sanbing.jcpp.app.service.grpc.DownlinkGrpcClient;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
/**
* @author baigod
*/
@Service
@Slf4j
@ConditionalOnExpression("'${downlink.rpc.type:null}'=='grpc'")
public class GrpcDownlinkCallService extends DownlinkCallService {
@Resource
DownlinkGrpcClient downlinkGrpcClient;
@Override
protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession) {
try {
downlinkGrpcClient.sendDownlinkRequest(HostAndPort.fromParts(pileSession.getNodeIp(), pileSession.getNodeGrpcPort()),
downlinkMessage);
} catch (Exception e) {
log.error("下行消息发送异常", e);
}
}
}

View File

@@ -0,0 +1,64 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.impl;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import sanbing.jcpp.app.data.PileSession;
import sanbing.jcpp.app.service.DownlinkCallService;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*;
/**
* @author baigod
*/
@Service
@Slf4j
@ConditionalOnExpression("'${downlink.rpc.type:null}'=='rest'")
public class RestDownlinkCallService extends DownlinkCallService {
@Resource
RestTemplate downlinkRestTemplate;
@Override
protected void _sendDownlinkMessage(DownlinkRequestMessage downlinkMessage, PileSession pileSession) {
try {
invokeDownlinkRestApi(downlinkMessage, pileSession.getNodeIp(), pileSession.getNodeRestPort());
} catch (RestClientException e) {
log.error("下行消息发送异常", e);
}
}
private void invokeDownlinkRestApi(DownlinkRequestMessage downlinkRequestMessage, String nodeWebapiIpPort, int nodeRestPort) {
HttpHeaders headers = new HttpHeaders();
headers.add(JCPP_TRACER_ID, TracerContextUtil.getCurrentTracer().getTraceId());
headers.add(JCPP_TRACER_ORIGIN, TracerContextUtil.getCurrentTracer().getOrigin());
headers.add(JCPP_TRACER_TS, String.valueOf(TracerContextUtil.getCurrentTracer().getTracerTs()));
headers.setContentType(MediaType.parseMediaType("application/x-protobuf"));
HttpEntity<DownlinkRequestMessage> entity = new HttpEntity<>(downlinkRequestMessage, headers);
try {
ResponseEntity<?> response = downlinkRestTemplate.postForEntity("http://" + nodeWebapiIpPort + ":" + nodeRestPort + "/api/onDownlink",
entity, ResponseEntity.class);
log.debug("下行消息发送成功 {}", response);
} catch (RestClientException e) {
log.error("下行消息发送失败 {}", downlinkRequestMessage, e);
throw new RuntimeException(e);
}
}
}