mirror of
https://gitee.com/san-bing/JChargePointProtocol
synced 2026-05-06 19:09:57 +08:00
grpc 增加连接过程
This commit is contained in:
@@ -6,7 +6,6 @@ package sanbing.jcpp.app.service.grpc;
|
||||
|
||||
import com.google.common.net.HostAndPort;
|
||||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
|
||||
@@ -18,21 +17,27 @@ import jakarta.annotation.PostConstruct;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import sanbing.jcpp.infrastructure.queue.discovery.ServiceInfoProvider;
|
||||
import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory;
|
||||
import sanbing.jcpp.infrastructure.util.mdc.MDCUtils;
|
||||
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
|
||||
import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
|
||||
import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc;
|
||||
import sanbing.jcpp.proto.gen.ProtocolDownlinkInterfaceGrpc.ProtocolDownlinkInterfaceStub;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkResponseMessage;
|
||||
import sanbing.jcpp.proto.gen.ProtocolInterfaceGrpc;
|
||||
import sanbing.jcpp.proto.gen.ProtocolInterfaceGrpc.ProtocolInterfaceStub;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.*;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static sanbing.jcpp.infrastructure.proto.ProtoConverter.toTracerProto;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
*/
|
||||
@@ -70,42 +75,39 @@ public class DownlinkGrpcClient {
|
||||
@Value("${downlink.rpc.grpc.records_ttl:600000}")
|
||||
private long recordsTtl;
|
||||
|
||||
private volatile boolean initialized = true;
|
||||
@Value("${downlink.rpc.grpc.max_reconnect_times:10}")
|
||||
private int maxReconnectTimes;
|
||||
|
||||
@Resource
|
||||
ServiceInfoProvider serviceInfoProvider;
|
||||
|
||||
private final Map<HostAndPort, ManagedChannel> channelMap = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, StreamObserver<DownlinkRequestMessage>> inputStreamMap = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, LinkedBlockingQueue<DownlinkRequestMessage>> queueMap = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, ReentrantLock> msgHandleLocks = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, ExecutorService> msgHandleExecutors = new ConcurrentHashMap<>();
|
||||
private ExecutorService grpcStarterExecutor;
|
||||
private ScheduledExecutorService grpcStateCheckExecutor;
|
||||
private final Map<HostAndPort, StreamObserver<RequestMsg>> inputStreamMap = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, LinkedBlockingQueue<RequestMsg>> queueMap = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, ReentrantLock> msgHandleLocksMap = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, ExecutorService> msgHandleExecutorMap = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, AtomicInteger> connectErrTimesMap = new ConcurrentHashMap<>();
|
||||
private final Map<HostAndPort, Boolean> initializedMap = new ConcurrentHashMap<>();
|
||||
private ScheduledExecutorService downlinkMsgsExecutor;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
grpcStarterExecutor = Executors.newSingleThreadExecutor(JCPPThreadFactory.forName("grpc-starter-executor"));
|
||||
grpcStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("grpc-check-executor"));
|
||||
downlinkMsgsExecutor = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName("downlink-msgs-executor"));
|
||||
|
||||
// 每秒进行一次连接检查与线程初始化
|
||||
downlinkMsgsExecutor.scheduleWithFixedDelay(() -> {
|
||||
queueMap.forEach((key, queue) -> {
|
||||
|
||||
if (queue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ManagedChannel managedChannel = channelMap.get(key);
|
||||
|
||||
if (managedChannel == null || managedChannel.getState(false) != ConnectivityState.READY) {
|
||||
grpcStarterExecutor.submit(new TracerRunnable(() -> connect(key)));
|
||||
return;
|
||||
if (managedChannel == null) {
|
||||
connect(key);
|
||||
}
|
||||
|
||||
msgHandleExecutors.computeIfAbsent(key, hostAndPort ->
|
||||
msgHandleExecutorMap.computeIfAbsent(key, hostAndPort ->
|
||||
Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("downlink-handle-threads-" + hostAndPort)))
|
||||
.execute(new TracerRunnable(() -> {
|
||||
while (initialized) {
|
||||
while (Boolean.TRUE.equals(initializedMap.computeIfAbsent(key, k -> Boolean.FALSE))) {
|
||||
try {
|
||||
handleMsgs(key, queue);
|
||||
} catch (Exception e) {
|
||||
@@ -116,32 +118,10 @@ public class DownlinkGrpcClient {
|
||||
});
|
||||
|
||||
}, 0, 1, TimeUnit.SECONDS);
|
||||
|
||||
grpcStateCheckExecutor.scheduleWithFixedDelay(() -> {
|
||||
channelMap.forEach((key, channel) -> {
|
||||
ConnectivityState state = channel.getState(true);
|
||||
|
||||
if (state == ConnectivityState.SHUTDOWN) {
|
||||
log.info("Grpc 客户端SHUTDOWN {} {}", key, state);
|
||||
|
||||
LinkedBlockingQueue<DownlinkRequestMessage> queue = queueMap.get(key);
|
||||
if (queue != null) {
|
||||
queue.clear();
|
||||
queueMap.remove(key);
|
||||
}
|
||||
|
||||
ExecutorService executorService = msgHandleExecutors.get(key);
|
||||
if (executorService != null) {
|
||||
executorService.shutdownNow();
|
||||
msgHandleExecutors.remove(key);
|
||||
}
|
||||
}
|
||||
});
|
||||
}, 0, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void handleMsgs(HostAndPort key, LinkedBlockingQueue<DownlinkRequestMessage> queue) {
|
||||
StreamObserver<DownlinkRequestMessage> inputStream = inputStreamMap.get(key);
|
||||
private void handleMsgs(HostAndPort key, LinkedBlockingQueue<RequestMsg> queue) {
|
||||
StreamObserver<RequestMsg> inputStream = inputStreamMap.get(key);
|
||||
|
||||
if (inputStream == null) {
|
||||
return;
|
||||
@@ -149,13 +129,13 @@ public class DownlinkGrpcClient {
|
||||
|
||||
long acceptTs = System.currentTimeMillis() - recordsTtl;
|
||||
|
||||
List<DownlinkRequestMessage> downlinkMsgs = new ArrayList<>(batchRecordsCount);
|
||||
List<RequestMsg> downlinkMsgs = new ArrayList<>(batchRecordsCount);
|
||||
|
||||
queue.drainTo(downlinkMsgs, batchRecordsCount);
|
||||
|
||||
for (DownlinkRequestMessage msg : downlinkMsgs) {
|
||||
for (RequestMsg msg : downlinkMsgs) {
|
||||
|
||||
long ts = msg.getTracer().getTs();
|
||||
long ts = msg.getTs();
|
||||
|
||||
if (ts > 0 && ts < acceptTs) {
|
||||
|
||||
@@ -165,7 +145,7 @@ public class DownlinkGrpcClient {
|
||||
}
|
||||
|
||||
|
||||
ReentrantLock lock = msgHandleLocks.computeIfAbsent(key, hostAndPort -> new ReentrantLock());
|
||||
ReentrantLock lock = msgHandleLocksMap.computeIfAbsent(key, hostAndPort -> new ReentrantLock());
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
@@ -175,7 +155,6 @@ public class DownlinkGrpcClient {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (downlinkMsgs.isEmpty()) {
|
||||
try {
|
||||
Thread.sleep(noRecordsSleepInterval);
|
||||
@@ -191,17 +170,12 @@ public class DownlinkGrpcClient {
|
||||
public void destroy() {
|
||||
log.info("Starting Grpc destroying process");
|
||||
|
||||
initialized = false;
|
||||
initializedMap.replaceAll((hostAndPort, aBoolean) -> Boolean.FALSE);
|
||||
|
||||
try {
|
||||
|
||||
grpcStarterExecutor.shutdownNow();
|
||||
|
||||
grpcStateCheckExecutor.shutdownNow();
|
||||
|
||||
downlinkMsgsExecutor.shutdownNow();
|
||||
|
||||
msgHandleExecutors.values().forEach(ExecutorService::shutdownNow);
|
||||
msgHandleExecutorMap.values().forEach(ExecutorService::shutdownNow);
|
||||
|
||||
inputStreamMap.values().forEach(StreamObserver::onCompleted);
|
||||
|
||||
@@ -214,10 +188,6 @@ public class DownlinkGrpcClient {
|
||||
|
||||
public void connect(HostAndPort hostAndPort) {
|
||||
|
||||
if (channelMap.get(hostAndPort) != null && channelMap.get(hostAndPort).getState(true).ordinal() < 2) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[{}] Create new Grpc Client Channel!", hostAndPort);
|
||||
|
||||
ManagedChannel managedChannel = NettyChannelBuilder.forAddress(hostAndPort.getHost(), hostAndPort.getPort())
|
||||
@@ -238,8 +208,6 @@ public class DownlinkGrpcClient {
|
||||
.defaultLoadBalancingPolicy("round_robin")
|
||||
.build();
|
||||
|
||||
log.info("Grpc 客户端READY {} {}", hostAndPort, managedChannel.getState(true));
|
||||
|
||||
ManagedChannel remove = channelMap.remove(hostAndPort);
|
||||
|
||||
if (remove != null) {
|
||||
@@ -248,17 +216,55 @@ public class DownlinkGrpcClient {
|
||||
|
||||
channelMap.put(hostAndPort, managedChannel);
|
||||
|
||||
ProtocolDownlinkInterfaceStub stub = ProtocolDownlinkInterfaceGrpc.newStub(managedChannel);
|
||||
ProtocolInterfaceStub stub = ProtocolInterfaceGrpc.newStub(managedChannel);
|
||||
|
||||
StreamObserver<DownlinkRequestMessage> streamObserver = stub.onDownlink(new StreamObserver<>() {
|
||||
StreamObserver<RequestMsg> streamObserver = stub.onDownlink(new StreamObserver<>() {
|
||||
@Override
|
||||
public void onNext(DownlinkResponseMessage value) {
|
||||
log.info("Grpc 接收到通信层反向回复 {}", value);
|
||||
public void onNext(ResponseMsg responseMsg) {
|
||||
TracerProto tracerProto = responseMsg.getTracer();
|
||||
TracerContextUtil.newTracer(tracerProto.getId(), tracerProto.getOrigin(), tracerProto.getTs());
|
||||
MDCUtils.recordTracer();
|
||||
|
||||
if (responseMsg.hasConnectResponseMsg()) {
|
||||
log.info("[{}] Grpc 接收到通信层连接反馈 {}", hostAndPort, responseMsg.getConnectResponseMsg());
|
||||
if (ConnectResponseCode.ACCEPTED.equals(responseMsg.getConnectResponseMsg().getResponseCode())) {
|
||||
|
||||
initializedMap.put(hostAndPort, Boolean.TRUE);
|
||||
|
||||
} else {
|
||||
onError(new RuntimeException(responseMsg.getConnectResponseMsg().getErrorMsg()));
|
||||
}
|
||||
}
|
||||
|
||||
if (responseMsg.hasDownlinkResponseMsg()) {
|
||||
DownlinkResponseMessage downlinkResponseMsg = responseMsg.getDownlinkResponseMsg();
|
||||
if (!downlinkResponseMsg.getSuccess()) {
|
||||
log.info("[{}] Grpc 下行数据发生错误回复 {}", hostAndPort, downlinkResponseMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
log.warn("Grpc 客户端异常 {}", t.getMessage());
|
||||
log.warn("[{}] Grpc 客户端异常 {}", hostAndPort, t.getMessage());
|
||||
|
||||
ExecutorService executorService = msgHandleExecutorMap.get(hostAndPort);
|
||||
if (executorService != null) {
|
||||
executorService.shutdownNow();
|
||||
msgHandleExecutorMap.remove(hostAndPort);
|
||||
}
|
||||
|
||||
ManagedChannel remove = channelMap.remove(hostAndPort);
|
||||
|
||||
if (remove != null) {
|
||||
remove.shutdownNow();
|
||||
}
|
||||
|
||||
if (connectErrTimesMap.computeIfAbsent(hostAndPort, k -> new AtomicInteger()).incrementAndGet() >= maxReconnectTimes) {
|
||||
queueMap.remove(hostAndPort);
|
||||
connectErrTimesMap.remove(hostAndPort);
|
||||
log.info("[{}] Grpc 客户端重连异常超过{}次,不再重连", hostAndPort, maxReconnectTimes);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -267,6 +273,13 @@ public class DownlinkGrpcClient {
|
||||
}
|
||||
});
|
||||
|
||||
streamObserver.onNext(RequestMsg.newBuilder()
|
||||
.setTracer(toTracerProto())
|
||||
.setConnectRequestMsg(ConnectRequestMsg.newBuilder()
|
||||
.setNodeId(serviceInfoProvider.getServiceId())
|
||||
.build())
|
||||
.build());
|
||||
|
||||
inputStreamMap.put(hostAndPort, streamObserver);
|
||||
|
||||
}
|
||||
@@ -274,7 +287,7 @@ public class DownlinkGrpcClient {
|
||||
/**
|
||||
* 发送下行请求
|
||||
*/
|
||||
public void sendDownlinkRequest(HostAndPort hostAndPort, DownlinkRequestMessage downlinkRequestMessage) {
|
||||
queueMap.computeIfAbsent(hostAndPort, k -> new LinkedBlockingQueue<>(maxRecordsSize)).add(downlinkRequestMessage);
|
||||
public void sendDownlinkRequest(HostAndPort hostAndPort, RequestMsg requestMsg) {
|
||||
queueMap.computeIfAbsent(hostAndPort, k -> new LinkedBlockingQueue<>(maxRecordsSize)).add(requestMsg);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user