Files
JChargePointProtocol/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/ProtocolBootstrap.java
2024-10-10 11:08:11 +08:00

125 lines
3.8 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.protocol;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import sanbing.jcpp.protocol.cfg.ForwarderCfg;
import sanbing.jcpp.protocol.cfg.ProtocolCfg;
import sanbing.jcpp.protocol.cfg.TcpCfg;
import sanbing.jcpp.protocol.cfg.enums.ForwarderType;
import sanbing.jcpp.protocol.forwarder.Forwarder;
import sanbing.jcpp.protocol.forwarder.KafkaForwarder;
import sanbing.jcpp.protocol.forwarder.MemoryForwarder;
import sanbing.jcpp.protocol.listener.Listener;
import sanbing.jcpp.protocol.listener.tcp.TcpListener;
import static org.springframework.boot.actuate.health.Status.UP;
/**
* @author baigod
*/
@Slf4j
public abstract class ProtocolBootstrap implements HealthIndicator {
@Resource
protected ProtocolContext protocolContext;
protected ProtocolCfg protocolCfg;
protected Listener listener;
protected Forwarder forwarder;
@PostConstruct
public void init() throws InterruptedException {
String protocolName = getProtocolName();
log.info("Protocol Service [{}] Initializing...", protocolName);
protocolCfg = protocolContext.getProtocolsConfigProvider().loadConfig(protocolName);
ForwarderCfg forwarderCfg = protocolCfg.getForwarder();
if (protocolContext.getServiceInfoProvider().isMonolith() && forwarderCfg.getType() == ForwarderType.memory) {
forwarder = new MemoryForwarder(getProtocolName(), forwarderCfg,
protocolContext.getStatsFactory(),
protocolContext.getAppQueueFactory(),
protocolContext.getPartitionProvider(),
protocolContext.getServiceInfoProvider());
} else if (forwarderCfg.getType() == ForwarderType.kafka) {
forwarder = new KafkaForwarder(getProtocolName(), forwarderCfg,
protocolContext.getStatsFactory(),
protocolContext.getAppQueueFactory(),
protocolContext.getPartitionProvider(),
protocolContext.getServiceInfoProvider());
} else {
throw new IllegalArgumentException("Unknown Forwarder type: " + forwarderCfg.getType());
}
TcpCfg tcpCfg = protocolCfg.getListener().getTcp();
if (tcpCfg != null) {
listener = new TcpListener(protocolName, tcpCfg, messageProcessor(), protocolContext.getStatsFactory());
}
_init();
}
@PreDestroy
public void destroy() throws InterruptedException {
log.info("{} destroy...", getProtocolName());
if (listener != null) {
listener.destroy();
}
if (forwarder != null) {
forwarder.destroy();
}
_destroy();
}
@Override
public Health health() {
Health.Builder healthBuilder;
if (listener != null && listener.health().getStatus() == UP && forwarder != null && forwarder.health().getStatus() == UP) {
healthBuilder = Health.up();
} else {
healthBuilder = Health.down();
}
if (listener != null) {
healthBuilder.withDetail("listener", listener.health().getStatus());
}
if (forwarder != null) {
healthBuilder.withDetail("forwarder", forwarder.health().getStatus());
}
return healthBuilder.build();
}
protected abstract String getProtocolName();
protected abstract void _init();
protected abstract void _destroy();
protected abstract ProtocolMessageProcessor messageProcessor();
}