Files
JChargePointProtocol/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/ProtocolBootstrap.java

125 lines
3.8 KiB
Java
Raw Normal View History

2024-10-08 09:38:54 +08:00
/**
* 抖音关注程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.protocol;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import sanbing.jcpp.protocol.cfg.ForwarderCfg;
import sanbing.jcpp.protocol.cfg.ProtocolCfg;
import sanbing.jcpp.protocol.cfg.TcpCfg;
import sanbing.jcpp.protocol.cfg.enums.ForwarderType;
import sanbing.jcpp.protocol.forwarder.Forwarder;
import sanbing.jcpp.protocol.forwarder.KafkaForwarder;
import sanbing.jcpp.protocol.forwarder.MemoryForwarder;
import sanbing.jcpp.protocol.listener.Listener;
import sanbing.jcpp.protocol.listener.tcp.TcpListener;
import static org.springframework.boot.actuate.health.Status.UP;
/**
* @author baigod
*/
@Slf4j
public abstract class ProtocolBootstrap implements HealthIndicator {
@Resource
protected ProtocolContext protocolContext;
protected ProtocolCfg protocolCfg;
protected Listener listener;
protected Forwarder forwarder;
@PostConstruct
public void init() throws InterruptedException {
String protocolName = getProtocolName();
log.info("Protocol Service [{}] Initializing...", protocolName);
protocolCfg = protocolContext.getProtocolsConfigProvider().loadConfig(protocolName);
ForwarderCfg forwarderCfg = protocolCfg.getForwarder();
if (protocolContext.getServiceInfoProvider().isMonolith() && forwarderCfg.getType() == ForwarderType.memory) {
forwarder = new MemoryForwarder(getProtocolName(), forwarderCfg,
protocolContext.getStatsFactory(),
protocolContext.getAppQueueFactory(),
protocolContext.getPartitionProvider(),
protocolContext.getServiceInfoProvider());
} else if (forwarderCfg.getType() == ForwarderType.kafka) {
forwarder = new KafkaForwarder(getProtocolName(), forwarderCfg,
protocolContext.getStatsFactory(),
protocolContext.getAppQueueFactory(),
protocolContext.getPartitionProvider(),
protocolContext.getServiceInfoProvider());
} else {
throw new IllegalArgumentException("Unknown Forwarder type: " + forwarderCfg.getType());
}
TcpCfg tcpCfg = protocolCfg.getListener().getTcp();
if (tcpCfg != null) {
2024-10-10 11:08:11 +08:00
listener = new TcpListener(protocolName, tcpCfg, messageProcessor(), protocolContext.getStatsFactory());
2024-10-08 09:38:54 +08:00
}
_init();
}
@PreDestroy
public void destroy() throws InterruptedException {
log.info("{} destroy...", getProtocolName());
if (listener != null) {
listener.destroy();
}
if (forwarder != null) {
forwarder.destroy();
}
_destroy();
}
@Override
public Health health() {
Health.Builder healthBuilder;
if (listener != null && listener.health().getStatus() == UP && forwarder != null && forwarder.health().getStatus() == UP) {
healthBuilder = Health.up();
} else {
healthBuilder = Health.down();
}
if (listener != null) {
healthBuilder.withDetail("listener", listener.health().getStatus());
}
if (forwarder != null) {
healthBuilder.withDetail("forwarder", forwarder.health().getStatus());
}
return healthBuilder.build();
}
protected abstract String getProtocolName();
protected abstract void _init();
protected abstract void _destroy();
protected abstract ProtocolMessageProcessor messageProcessor();
}