云快充1.5.0 初始化

This commit is contained in:
3god
2024-10-08 09:38:54 +08:00
parent dea6774942
commit cb19b45919
297 changed files with 18020 additions and 28 deletions

View File

@@ -0,0 +1,66 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.queue;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import sanbing.jcpp.infrastructure.queue.discovery.PartitionProvider;
import sanbing.jcpp.infrastructure.queue.discovery.event.JCPPApplicationEventListener;
import sanbing.jcpp.infrastructure.queue.discovery.event.PartitionChangeEvent;
import sanbing.jcpp.infrastructure.util.annotation.AfterStartUp;
import sanbing.jcpp.infrastructure.util.async.JCPPExecutors;
import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractConsumerService extends JCPPApplicationEventListener<PartitionChangeEvent> {
protected final PartitionProvider partitionProvider;
protected final ApplicationEventPublisher eventPublisher;
protected ExecutorService consumersExecutor;
protected ExecutorService mgmtExecutor;
protected ScheduledExecutorService scheduler;
public void init(String prefix) {
this.consumersExecutor = Executors.newCachedThreadPool(JCPPThreadFactory.forName(prefix + "-consumer"));
this.mgmtExecutor = JCPPExecutors.newWorkStealingPool(getMgmtThreadPoolSize(), prefix + "-mgmt");
this.scheduler = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName(prefix + "-consumer-scheduler"));
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void afterStartUp() {
startConsumers();
}
protected void startConsumers() {
}
protected void stopConsumers() {
}
protected abstract int getMgmtThreadPoolSize();
@PreDestroy
public void destroy() {
stopConsumers();
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
if (mgmtExecutor != null) {
mgmtExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
}

View File

@@ -0,0 +1,85 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.queue;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.infrastructure.stats.StatsCounter;
import sanbing.jcpp.infrastructure.stats.StatsFactory;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class AppConsumerStats {
public static final String TOTAL_MSGS = "totalMsgs";
public static final String LOGIN_EVENTS = "loginEvents";
public static final String HEARTBEAT_EVENTS = "heartBeatEvents";
public static final String GUN_RUN_STATUS_EVENTS = "gunRunStatusEvents";
public static final String CHARGING_PROGRESS_EVENTS = "chargingProgressEvents";
public static final String TRANSACTION_RECORD_EVENTS = "transactionRecordEvents";
private final StatsCounter totalCounter;
private final StatsCounter loginCounter;
private final StatsCounter heartBeatCounter;
private final StatsCounter gunRunStatusCounter;
private final StatsCounter chargingProgressCounter;
private final StatsCounter transactionRecordCounter;
private final Timer appConsumerTimer;
private final List<StatsCounter> counters = new ArrayList<>();
public AppConsumerStats(StatsFactory statsFactory) {
String statsKey = "appConsumer";
this.totalCounter = register(statsFactory.createStatsCounter(statsKey, TOTAL_MSGS));
this.loginCounter = register(statsFactory.createStatsCounter(statsKey, LOGIN_EVENTS));
this.heartBeatCounter = register(statsFactory.createStatsCounter(statsKey, HEARTBEAT_EVENTS));
this.gunRunStatusCounter = register(statsFactory.createStatsCounter(statsKey, GUN_RUN_STATUS_EVENTS));
this.chargingProgressCounter = register(statsFactory.createStatsCounter(statsKey, CHARGING_PROGRESS_EVENTS));
this.transactionRecordCounter = register(statsFactory.createStatsCounter(statsKey, TRANSACTION_RECORD_EVENTS));
this.appConsumerTimer = statsFactory.createTimer(statsKey);
}
private StatsCounter register(StatsCounter counter) {
counters.add(counter);
return counter;
}
public void log(UplinkQueueMessage msg) {
totalCounter.increment();
if (msg.hasLoginRequest()) {
loginCounter.increment();
} else if (msg.hasHeartBeatRequest()) {
heartBeatCounter.increment();
} else if (msg.hasGunRunStatusProto()) {
gunRunStatusCounter.increment();
} else if (msg.hasChargingProgressProto()) {
chargingProgressCounter.increment();
} else if (msg.hasTransactionRecord()) {
transactionRecordCounter.increment();
}
appConsumerTimer.record(Duration.ofMillis(System.currentTimeMillis() - TracerContextUtil.getCurrentTracer().getTracerTs()));
}
public void printStats() {
int total = totalCounter.get();
if (total > 0) {
StringBuilder stats = new StringBuilder();
counters.forEach(counter -> {
stats.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
});
log.info("App Queue Consumer Stats: {}", stats);
}
}
public void reset() {
counters.forEach(StatsCounter::clear);
}
}

View File

@@ -0,0 +1,296 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.queue;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.infrastructure.queue.QueueConsumer;
import sanbing.jcpp.infrastructure.queue.QueueMsg;
import sanbing.jcpp.infrastructure.queue.common.QueueConfig;
import sanbing.jcpp.infrastructure.queue.common.TopicPartitionInfo;
import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@Slf4j
public class AppQueueConsumerManager<M extends QueueMsg, C extends QueueConfig> {
protected final String queueName;
@Getter
protected C config;
protected final MsgPackProcessor<M, C> msgPackProcessor;
protected final BiFunction<C, Integer, QueueConsumer<M>> consumerCreator;
protected final ExecutorService consumerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService taskExecutor;
private final Queue<QueueConsumerManagerTask> tasks = new ConcurrentLinkedQueue<>();
private final ReentrantLock lock = new ReentrantLock();
@Getter
private volatile Set<TopicPartitionInfo> partitions;
protected volatile ConsumerWrapper<M> consumerWrapper;
protected volatile boolean stopped;
@Builder
public AppQueueConsumerManager(String queueName, C config,
MsgPackProcessor<M, C> msgPackProcessor,
BiFunction<C, Integer, QueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor,
ScheduledExecutorService scheduler,
ExecutorService taskExecutor) {
this.queueName = queueName;
this.config = config;
this.msgPackProcessor = msgPackProcessor;
this.consumerCreator = consumerCreator;
this.consumerExecutor = consumerExecutor;
this.scheduler = scheduler;
this.taskExecutor = taskExecutor;
if (config != null) {
init(config);
}
}
public void init(C config) {
this.config = config;
if (config.isConsumerPerPartition()) {
this.consumerWrapper = new ConsumerPerPartitionWrapper();
} else {
this.consumerWrapper = new SingleConsumerWrapper();
}
log.debug("[{}] Initialized consumer for queue: {}", queueName, config);
}
public void update(C config) {
addTask(QueueConsumerManagerTask.configUpdate(config));
}
public void update(Set<TopicPartitionInfo> partitions) {
addTask(QueueConsumerManagerTask.partitionChange(partitions));
}
protected void addTask(QueueConsumerManagerTask todo) {
if (stopped) {
return;
}
tasks.add(todo);
log.info("[{}] Added task: {}", queueName, todo);
tryProcessTasks();
}
@SuppressWarnings("unchecked")
private void tryProcessTasks() {
taskExecutor.submit(() -> {
if (lock.tryLock()) {
try {
C newConfig = null;
Set<TopicPartitionInfo> newPartitions = null;
while (!stopped) {
QueueConsumerManagerTask task = tasks.poll();
if (task == null) {
break;
}
log.info("[{}] Processing task: {}", queueName, task);
if (task.getEvent() == QueueEvent.PARTITION_CHANGE) {
newPartitions = task.getPartitions();
} else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) {
newConfig = (C) task.getConfig();
} else {
processTask(task);
}
}
if (stopped) {
return;
}
if (newConfig != null) {
doUpdate(newConfig);
}
if (newPartitions != null) {
doUpdate(newPartitions);
}
} catch (Exception e) {
log.error("[{}] Failed to process tasks", queueName, e);
} finally {
lock.unlock();
}
} else {
log.trace("[{}] Failed to acquire lock", queueName);
scheduler.schedule(this::tryProcessTasks, 1, TimeUnit.SECONDS);
}
});
}
protected void processTask(QueueConsumerManagerTask task) {
}
private void doUpdate(C newConfig) {
log.info("[{}] Processing queue update: {}", queueName, newConfig);
var oldConfig = this.config;
this.config = newConfig;
if (log.isTraceEnabled()) {
log.trace("[{}] Old queue configuration: {}", queueName, oldConfig);
log.trace("[{}] New queue configuration: {}", queueName, newConfig);
}
if (oldConfig == null) {
init(config);
} else if (newConfig.isConsumerPerPartition() != oldConfig.isConsumerPerPartition()) {
consumerWrapper.getConsumers().forEach(QueueConsumerTask::initiateStop);
consumerWrapper.getConsumers().forEach(QueueConsumerTask::awaitCompletion);
init(config);
if (partitions != null) {
doUpdate(partitions);
}
} else {
log.trace("[{}] Silently applied new config, because consumer-per-partition not changed", queueName);
}
}
private void doUpdate(Set<TopicPartitionInfo> partitions) {
this.partitions = partitions;
consumerWrapper.updatePartitions(partitions);
}
private void launchConsumer(QueueConsumerTask<M> consumerTask) {
log.info("[{}] Launching consumer", consumerTask.getKey());
Future<?> consumerLoop = consumerExecutor.submit(() -> {
JCPPThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString());
try {
consumerLoop(consumerTask.getConsumer());
} catch (Throwable e) {
log.error("Failure in consumer loop", e);
}
log.info("[{}] Consumer stopped", consumerTask.getKey());
});
consumerTask.setTask(consumerLoop);
}
private void consumerLoop(QueueConsumer<M> consumer) {
while (!stopped && !consumer.isStopped()) {
try {
List<M> msgs = consumer.poll(config.getPollInterval());
if (msgs.isEmpty()) {
continue;
}
processMsgs(msgs, consumer, config);
} catch (Exception e) {
if (!consumer.isStopped()) {
log.warn("Failed to process messages from queue", e);
try {
Thread.sleep(config.getPollInterval());
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
if (consumer.isStopped()) {
consumer.unsubscribe();
}
}
protected void processMsgs(List<M> msgs, QueueConsumer<M> consumer, C config) throws Exception {
msgPackProcessor.process(msgs, consumer, config);
}
public void stop() {
log.debug("[{}] Stopping consumers", queueName);
consumerWrapper.getConsumers().forEach(QueueConsumerTask::initiateStop);
stopped = true;
}
public void awaitStop() {
log.debug("[{}] Waiting for consumers to stop", queueName);
consumerWrapper.getConsumers().forEach(QueueConsumerTask::awaitCompletion);
log.debug("[{}] Unsubscribed and stopped consumers", queueName);
}
private static String partitionsToString(Collection<TopicPartitionInfo> partitions) {
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.joining(", ", "[", "]"));
}
public interface MsgPackProcessor<M extends QueueMsg, C extends QueueConfig> {
void process(List<M> msgs, QueueConsumer<M> consumer, C config) throws Exception;
}
public interface ConsumerWrapper<M extends QueueMsg> {
void updatePartitions(Set<TopicPartitionInfo> partitions);
Collection<QueueConsumerTask<M>> getConsumers();
}
class ConsumerPerPartitionWrapper implements ConsumerWrapper<M> {
private final Map<TopicPartitionInfo, QueueConsumerTask<M>> consumers = new HashMap<>();
@Override
public void updatePartitions(Set<TopicPartitionInfo> partitions) {
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
addedPartitions.removeAll(consumers.keySet());
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
removedPartitions.removeAll(partitions);
log.info("[{}] Added partitions: {}, removed partitions: {}", queueName, partitionsToString(addedPartitions), partitionsToString(removedPartitions));
removedPartitions.forEach((tpi) -> consumers.get(tpi).initiateStop());
removedPartitions.forEach((tpi) -> consumers.remove(tpi).awaitCompletion());
addedPartitions.forEach((tpi) -> {
Integer partitionId = tpi.getPartition().orElse(-1);
String key = queueName + "-" + partitionId;
QueueConsumerTask<M> consumer = new QueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId));
consumers.put(tpi, consumer);
consumer.subscribe(Set.of(tpi));
launchConsumer(consumer);
});
}
@Override
public Collection<QueueConsumerTask<M>> getConsumers() {
return consumers.values();
}
}
class SingleConsumerWrapper implements ConsumerWrapper<M> {
private QueueConsumerTask<M> consumer;
@Override
public void updatePartitions(Set<TopicPartitionInfo> partitions) {
log.info("[{}] New partitions: {}", queueName, partitionsToString(partitions));
if (partitions.isEmpty()) {
if (consumer != null && consumer.isRunning()) {
consumer.initiateStop();
consumer.awaitCompletion();
}
consumer = null;
return;
}
if (consumer == null) {
consumer = new QueueConsumerTask<>(queueName, () -> consumerCreator.apply(config, null)); // no partitionId passed
}
consumer.subscribe(partitions);
if (!consumer.isRunning()) {
launchConsumer(consumer);
}
}
@Override
public Collection<QueueConsumerTask<M>> getConsumers() {
if (consumer == null) {
return Collections.emptyList();
}
return List.of(consumer);
}
}
}

View File

@@ -0,0 +1,37 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.queue;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import sanbing.jcpp.infrastructure.queue.common.QueueConfig;
import sanbing.jcpp.infrastructure.queue.common.TopicPartitionInfo;
import java.util.Set;
@Getter
@ToString
@AllArgsConstructor
public class QueueConsumerManagerTask {
private final QueueEvent event;
private QueueConfig config;
private Set<TopicPartitionInfo> partitions;
private boolean drainQueue;
public static QueueConsumerManagerTask delete(boolean drainQueue) {
return new QueueConsumerManagerTask(QueueEvent.DELETE, null, null, drainQueue);
}
public static QueueConsumerManagerTask configUpdate(QueueConfig config) {
return new QueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, config, null, false);
}
public static QueueConsumerManagerTask partitionChange(Set<TopicPartitionInfo> partitions) {
return new QueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, null, partitions, false);
}
}

View File

@@ -0,0 +1,78 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.queue;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import sanbing.jcpp.infrastructure.queue.QueueConsumer;
import sanbing.jcpp.infrastructure.queue.QueueMsg;
import sanbing.jcpp.infrastructure.queue.common.TopicPartitionInfo;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@Slf4j
public class QueueConsumerTask<M extends QueueMsg> {
@Getter
private final Object key;
private volatile QueueConsumer<M> consumer;
private volatile Supplier<QueueConsumer<M>> consumerSupplier;
@Setter
private Future<?> task;
public QueueConsumerTask(Object key, Supplier<QueueConsumer<M>> consumerSupplier) {
this.key = key;
this.consumer = null;
this.consumerSupplier = consumerSupplier;
}
public QueueConsumer<M> getConsumer() {
if (consumer == null) {
synchronized (this) {
if (consumer == null) {
Objects.requireNonNull(consumerSupplier, "consumerSupplier for key [" + key + "] is null");
consumer = consumerSupplier.get();
Objects.requireNonNull(consumer, "consumer for key [" + key + "] is null");
consumerSupplier = null;
}
}
}
return consumer;
}
public void subscribe(Set<TopicPartitionInfo> partitions) {
log.info("[{}] Subscribing to partitions: {}", key, partitions);
getConsumer().subscribe(partitions);
}
public void initiateStop() {
log.debug("[{}] Initiating stop", key);
getConsumer().stop();
}
public void awaitCompletion() {
log.trace("[{}] Awaiting finish", key);
if (isRunning()) {
try {
task.get(30, TimeUnit.SECONDS);
log.trace("[{}] Awaited finish", key);
} catch (Exception e) {
log.warn("[{}] Failed to await for consumer to stop", key, e);
}
task = null;
}
}
public boolean isRunning() {
return task != null;
}
}

View File

@@ -0,0 +1,13 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.queue;
import java.io.Serializable;
public enum QueueEvent implements Serializable {
PARTITION_CHANGE, CONFIG_UPDATE, DELETE
}

View File

@@ -0,0 +1,236 @@
/**
* 抖音关注:程序员三丙
* 知识星球https://t.zsxq.com/j9b21
*/
package sanbing.jcpp.app.service.queue.consumer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import sanbing.jcpp.app.service.PileProtocolService;
import sanbing.jcpp.app.service.queue.AbstractConsumerService;
import sanbing.jcpp.app.service.queue.AppConsumerStats;
import sanbing.jcpp.app.service.queue.AppQueueConsumerManager;
import sanbing.jcpp.infrastructure.queue.*;
import sanbing.jcpp.infrastructure.queue.common.QueueConfig;
import sanbing.jcpp.infrastructure.queue.common.TopicPartitionInfo;
import sanbing.jcpp.infrastructure.queue.discovery.PartitionProvider;
import sanbing.jcpp.infrastructure.queue.discovery.event.PartitionChangeEvent;
import sanbing.jcpp.infrastructure.queue.processing.IdMsgPair;
import sanbing.jcpp.infrastructure.queue.provider.AppQueueFactory;
import sanbing.jcpp.infrastructure.stats.StatsFactory;
import sanbing.jcpp.infrastructure.util.annotation.AppComponent;
import sanbing.jcpp.infrastructure.util.codec.ByteUtil;
import sanbing.jcpp.infrastructure.util.mdc.MDCUtils;
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
/**
* @author baigod
*/
@Service
@AppComponent
@Slf4j
public class ProtocolUplinkConsumerService extends AbstractConsumerService implements ApplicationListener<PartitionChangeEvent> {
@Value("${queue.app.poll-interval}")
private int pollInterval;
@Value("${queue.app.pack-processing-timeout}")
private long packProcessingTimeout;
@Value("${queue.app.consumer-per-partition}")
private boolean consumerPerPartition;
@Value("${queue.app.stats.enabled}")
private boolean statsEnabled;
private final PileProtocolService pileProtocolService;
private final AppQueueFactory appQueueFactory;
private AppQueueConsumerManager<ProtoQueueMsg<UplinkQueueMessage>, AppQueueConfig> appConsumer;
private final AppConsumerStats stats;
public ProtocolUplinkConsumerService(PartitionProvider partitionProvider,
ApplicationEventPublisher eventPublisher,
PileProtocolService pileProtocolService,
AppQueueFactory appQueueFactory,
StatsFactory statsFactory) {
super(partitionProvider, eventPublisher);
this.pileProtocolService = pileProtocolService;
this.appQueueFactory = appQueueFactory;
this.stats = new AppConsumerStats(statsFactory);
}
@PostConstruct
public void init() {
super.init("jcpp-app");
log.info("Initializing Protocol Uplink Messages Queue Subscriptions.");
this.appConsumer = AppQueueConsumerManager.<ProtoQueueMsg<UplinkQueueMessage>, AppQueueConfig>builder()
.queueName("protocol uplink")
.config(AppQueueConfig.of(consumerPerPartition, pollInterval))
.msgPackProcessor(this::processMsgs)
.consumerCreator((config, partitionId) -> appQueueFactory.createProtocolUplinkMsgConsumer())
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)
.build();
}
@PreDestroy
public void destroy() {
super.destroy();
}
@Override
protected void stopConsumers() {
super.stopConsumers();
appConsumer.stop();
appConsumer.awaitStop();
}
@Scheduled(fixedDelayString = "${queue.app.stats.print-interval-ms}")
public void printStats() {
if (statsEnabled) {
stats.printStats();
stats.reset();
}
}
private void processMsgs(List<ProtoQueueMsg<UplinkQueueMessage>> msgs, QueueConsumer<ProtoQueueMsg<UplinkQueueMessage>> consumer, AppQueueConfig config) throws Exception {
List<IdMsgPair<UplinkQueueMessage>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList();
ConcurrentMap<UUID, ProtoQueueMsg<UplinkQueueMessage>> pendingMap = orderedMsgList.stream().collect(
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
PackProcessingContext<ProtoQueueMsg<UplinkQueueMessage>> ctx = new PackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
Future<?> packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() ->
orderedMsgList.forEach((element) -> {
UUID id = element.getUuid();
ProtoQueueMsg<UplinkQueueMessage> msg = element.getMsg();
tracer(msg);
log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
Callback callback = new PackCallback<>(id, ctx);
try {
UplinkQueueMessage uplinkQueueMsg = msg.getValue();
pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg);
if (uplinkQueueMsg.hasLoginRequest()) {
pileProtocolService.pileLogin(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasHeartBeatRequest()) {
pileProtocolService.heartBeat(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasVerifyPricingRequest()) {
pileProtocolService.verifyPricing(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasQueryPricingRequest()) {
pileProtocolService.queryPricing(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasGunRunStatusProto()) {
pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasChargingProgressProto()) {
pileProtocolService.postChargingProgress(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasSetPricingResponse()) {
pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) {
pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback);
} else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) {
pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback);
} else if(uplinkQueueMsg.hasTransactionRecord()){
pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback);
}else {
callback.onSuccess();
}
if (statsEnabled) {
stats.log(uplinkQueueMsg);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
callback.onFailure(e);
}
}))
);
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
if (!packSubmitFuture.isDone()) {
packSubmitFuture.cancel(true);
UplinkQueueMessage lastSubmitMsg = pendingMsgHolder.getUplinkQueueMessage();
log.warn("Timeout to process message: {}", lastSubmitMsg);
}
if (log.isDebugEnabled()) {
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
}
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
consumer.commit();
}
private void tracer(ProtoQueueMsg<UplinkQueueMessage> msg) {
Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID))
.map(tracerId -> {
String origin = null;
byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN);
if (tracerOrigin != null) {
origin = ByteUtil.bytesToString(tracerOrigin);
}
long ts = System.currentTimeMillis();
byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS);
if (tracerTs != null) {
ts = ByteUtil.bytesToLong(tracerTs);
}
return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts);
})
.orElseGet(TracerContextUtil::newTracer);
MDCUtils.recordTracer();
}
@Override
protected int getMgmtThreadPoolSize() {
return Math.max(Runtime.getRuntime().availableProcessors(), 4);
}
@Override
protected void onJCPPApplicationEvent(PartitionChangeEvent event) {
Set<TopicPartitionInfo> appPartitions = event.getAppPartitions();
log.info("Subscribing to partitions: {}", appPartitions);
appConsumer.update(appPartitions);
}
@Data(staticConstructor = "of")
public static class AppQueueConfig implements QueueConfig {
private final boolean consumerPerPartition;
private final int pollInterval;
}
@Setter
@Getter
private static class PendingMsgHolder {
private volatile UplinkQueueMessage uplinkQueueMessage;
}
}