diff --git a/docker/Dockerfile-App b/docker/Dockerfile-App
index 144da60..f619b3d 100644
--- a/docker/Dockerfile-App
+++ b/docker/Dockerfile-App
@@ -35,6 +35,9 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin
EXPOSE 8080 8080
+ENV APP_LOG_LEVEL=INFO
+ENV PROTOCOLS_LOG_LEVEL=INFO
+
CMD ["start.sh"]
diff --git a/docker/Dockerfile-Protocol b/docker/Dockerfile-Protocol
index 7aa342a..82aae02 100644
--- a/docker/Dockerfile-Protocol
+++ b/docker/Dockerfile-Protocol
@@ -35,6 +35,8 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin
EXPOSE 8081 8081
+ENV PROTOCOLS_LOG_LEVEL=INFO
+
CMD ["start.sh"]
diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml
index 3d41a68..8b791b5 100644
--- a/jcpp-app-bootstrap/src/main/resources/app-service.yml
+++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml
@@ -27,7 +27,7 @@ spring:
password: "${SPRING_DATASOURCE_PASSWORD:postgres}"
hikari:
leak-detection-threshold: "${SPRING_DATASOURCE_HIKARI_LEAK_DETECTION_THRESHOLD:0}"
- maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:16}"
+ maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:64}"
register-mbeans: "${SPRING_DATASOURCE_HIKARI_REGISTER_MBEANS:false}"
mybatis-plus:
@@ -63,9 +63,10 @@ queue:
type: "${QUEUE_TYPE:memory}"
partitions:
hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
- in_memory:
+ in-memory:
+ queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}"
stats:
- print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
+ print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:10000}"
kafka:
bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}"
ssl:
@@ -93,7 +94,7 @@ queue:
auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}"
other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}"
topic-properties:
- app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
+ app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats:
enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"
@@ -115,8 +116,8 @@ cache:
type: "${CACHE_TYPE:caffeine}" # caffeine or redis
specs:
piles:
- timeToLiveInMinutes: "${CACHE_SPECS_PILES_TTL:15}"
- maxSize: "${CACHE_SPECS_PILES_MAX_SIZE:1000}"
+ timeToLiveInMinutes: "${CACHE_SPECS_PILES_TTL:1440}"
+ maxSize: "${CACHE_SPECS_PILES_MAX_SIZE:100000}"
pileSessions:
timeToLiveInMinutes: "${CACHE_SPECS_PILE_SESSIONS_TTL:1440}"
maxSize: "${CACHE_SPECS_PILE_SESSIONS_MAX_SIZE:100000}"
@@ -132,7 +133,7 @@ redis:
commandTimeout: "${REDIS_CLIENT_COMMAND_TIMEOUT:30000}"
shutdownTimeout: "${REDIS_CLIENT_SHUTDOWN_TIMEOUT:1000}"
readTimeout: "${REDIS_CLIENT_READ_TIMEOUT:60000}"
- usePoolConfig: "${REDIS_CLIENT_USE_POOL_CONFIG:false}"
+ usePoolConfig: "${REDIS_CLIENT_USE_POOL_CONFIG:true}"
cluster:
nodes: "${REDIS_NODES:redis-node-0:6379,redis-node-1:6379,redis-node-2:6379,redis-node-3:6379,redis-node-4:6379,redis-node-5:6379}"
max-redirects: "${REDIS_MAX_REDIRECTS:12}"
@@ -146,7 +147,7 @@ redis:
password: "${REDIS_PASSWORD:sanbing}"
pool_config:
maxTotal: "${REDIS_POOL_CONFIG_MAX_TOTAL:128}"
- maxIdle: "${REDIS_POOL_CONFIG_MAX_IDLE:128}"
+ maxIdle: "${REDIS_POOL_CONFIG_MAX_IDLE:64}"
minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}"
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}"
diff --git a/jcpp-app-bootstrap/src/main/resources/log4j2.xml b/jcpp-app-bootstrap/src/main/resources/log4j2.xml
index 082044b..d5884d8 100644
--- a/jcpp-app-bootstrap/src/main/resources/log4j2.xml
+++ b/jcpp-app-bootstrap/src/main/resources/log4j2.xml
@@ -18,7 +18,8 @@
-
+
+
@@ -45,6 +46,12 @@
+
+
+
+
+
diff --git a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java
index af650ff..d2c7099 100644
--- a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java
+++ b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java
@@ -12,9 +12,9 @@ import sanbing.jcpp.app.dal.config.ibatis.enums.GunOptStatusEnum;
import sanbing.jcpp.app.dal.config.ibatis.enums.GunRunStatusEnum;
import sanbing.jcpp.app.dal.config.ibatis.enums.OwnerTypeEnum;
import sanbing.jcpp.app.dal.entity.Gun;
-import sanbing.jcpp.app.dal.entity.Pile;
import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
+import java.text.DecimalFormat;
import java.time.LocalDateTime;
import java.util.UUID;
@@ -42,9 +42,6 @@ class GunMapperIT extends AbstractTestBase {
@Resource
GunMapper gunMapper;
- @Resource
- PileMapper pileMapper;
-
@Test
void curdTest() {
gunMapper.delete(Wrappers.lambdaQuery());
@@ -53,15 +50,13 @@ class GunMapperIT extends AbstractTestBase {
UUID pileId = NORMAL_PILE_ID[i];
UUID gunId = NORMAL_GUN_ID[i];
- Pile pile = pileMapper.selectById(pileId);
-
Gun gun = Gun.builder()
.id(gunId)
.createdTime(LocalDateTime.now())
.additionalInfo(JacksonUtil.newObjectNode())
.gunNo("02")
- .gunName(pile.getPileName() + "的2号枪")
- .gunCode(pile.getPileCode() + "-02")
+ .gunName(String.format("三丙家的%d号充电桩", i + 1) + "的2号枪")
+ .gunCode("202312120000" + new DecimalFormat("00").format(i + 1) + "-02")
.stationId(NORMAL_STATION_ID)
.pileId(pileId)
.ownerId(NORMAL_USER_ID)
diff --git a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/PileMapperIT.java b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/PileMapperIT.java
index 46168eb..e00a37d 100644
--- a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/PileMapperIT.java
+++ b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/PileMapperIT.java
@@ -17,7 +17,10 @@ import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
import java.text.DecimalFormat;
import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import static sanbing.jcpp.app.dal.mapper.StationMapperIT.NORMAL_STATION_ID;
import static sanbing.jcpp.app.dal.mapper.UserMapperIT.NORMAL_USER_ID;
@@ -68,7 +71,36 @@ class PileMapperIT extends AbstractTestBase {
pileMapper.insertOrUpdate(pile);
log.info("{}", pileMapper.selectById(pileId));
-
}
}
+
+ @Test
+ void generate100KPiles() {
+ AtomicInteger number = new AtomicInteger(1);
+ for (int i = 0; i < 100; i++) {
+ List piles = new ArrayList<>();
+ for (int j = 0; j < 1000; j++, number.incrementAndGet()) {
+ Pile pile = Pile.builder()
+ .id(UUID.randomUUID())
+ .createdTime(LocalDateTime.now())
+ .additionalInfo(JacksonUtil.newObjectNode())
+ .pileName(String.format("三丙家的%d号充电桩", number.get()))
+ .pileCode("20241015" + new DecimalFormat("000000").format(number.get()))
+ .protocol("yunkuaichongV150")
+ .stationId(NORMAL_STATION_ID)
+ .ownerId(NORMAL_USER_ID)
+ .ownerType(OwnerTypeEnum.C)
+ .brand("星星")
+ .model("10A")
+ .manufacturer("星星")
+ .status(PileStatusEnum.IDLE)
+ .type(PileTypeEnum.AC)
+ .build();
+ piles.add(pile);
+ }
+ pileMapper.insert(piles);
+ }
+
+ log.info("{}", pileMapper.selectCount(Wrappers.lambdaQuery()));
+ }
}
\ No newline at end of file
diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/cache/session/PileSessionCacheKey.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/cache/session/PileSessionCacheKey.java
index e18f0e4..8d6f5a6 100644
--- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/cache/session/PileSessionCacheKey.java
+++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/cache/session/PileSessionCacheKey.java
@@ -7,35 +7,26 @@ package sanbing.jcpp.app.service.cache.session;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import java.io.Serializable;
-import java.util.Optional;
-import java.util.UUID;
/**
* @author baigod
*/
@Getter
@EqualsAndHashCode
-@RequiredArgsConstructor
@Builder
public class PileSessionCacheKey implements Serializable {
- private final UUID pileId;
private final String pileCode;
- public PileSessionCacheKey(UUID pileId) {
- this(pileId, null);
- }
-
public PileSessionCacheKey(String pileCode) {
- this(null, pileCode);
+ this.pileCode = pileCode;
}
@Override
public String toString() {
- return Optional.ofNullable(pileId).map(UUID::toString).orElse(pileCode);
+ return pileCode;
}
}
\ No newline at end of file
diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java
index 90c1f76..62a6b87 100644
--- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java
+++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java
@@ -6,6 +6,7 @@ package sanbing.jcpp.app.service.impl;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
@@ -44,12 +45,15 @@ public class DefaultDownlinkCallService implements DownlinkCallService {
@Resource
TransactionalCache pileSessionCache;
+ @Value("${cache.type}")
+ private String cacheType;
+
@Override
public void sendDownlinkMessage(DownlinkRestMessage.Builder downlinkMessageBuilder, String pileCode) {
- if (serviceInfoProvider.isMonolith()) {
+ if (serviceInfoProvider.isMonolith() && "caffeine".equalsIgnoreCase(cacheType)) {
downlinkController.onDownlink(downlinkMessageBuilder.build())
- .setResultHandler(result -> log.info("下行消息发送完成"));
+ .setResultHandler(result -> log.debug("下行消息发送完成"));
} else {
try {
@@ -83,7 +87,7 @@ public class DefaultDownlinkCallService implements DownlinkCallService {
try {
ResponseEntity> response = downlinkRestTemplate.postForEntity("http://" + nodeWebapiIpPort + "/api/onDownlink",
entity, ResponseEntity.class);
- log.info("下行消息发送成功 {}", response);
+ log.debug("下行消息发送成功 {}", response);
} catch (RestClientException e) {
log.error("下行消息发送失败 {}", downlinkRestMessage, e);
throw new RuntimeException(e);
diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java
index f8387d3..10fc0dd 100644
--- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java
+++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java
@@ -48,7 +48,7 @@ public class DefaultPileProtocolService implements PileProtocolService {
@Override
public void pileLogin(UplinkQueueMessage uplinkQueueMessage, Callback callback) {
- log.info("接收到桩登录事件 {}", uplinkQueueMessage);
+ log.debug("接收到桩登录事件 {}", uplinkQueueMessage);
LoginRequest loginRequest = uplinkQueueMessage.getLoginRequest();
@@ -56,7 +56,7 @@ public class DefaultPileProtocolService implements PileProtocolService {
String pileCode = loginRequest.getPileCode();
- log.info("查询到充电桩信息 {}", pile);
+ log.debug("查询到充电桩信息 {}", pile);
// 构造下行回复
DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, loginRequest.getPileCode());
@@ -108,7 +108,6 @@ public class DefaultPileProtocolService implements PileProtocolService {
pileSession.setRemoteAddress(remoteAddress);
pileSession.setNodeId(nodeId);
pileSession.setNodeWebapiIpPort(nodeWebapiIpPort);
- pileSessionCache.put(new PileSessionCacheKey(pile.getId()), pileSession);
pileSessionCache.put(new PileSessionCacheKey(pile.getPileCode()), pileSession);
}
diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/AbstractConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/AbstractConsumerService.java
index cff3ebd..0b91f94 100644
--- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/AbstractConsumerService.java
+++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/AbstractConsumerService.java
@@ -31,7 +31,7 @@ public abstract class AbstractConsumerService extends JCPPApplicationEventListen
protected ScheduledExecutorService scheduler;
public void init(String prefix) {
- this.consumersExecutor = Executors.newCachedThreadPool(JCPPThreadFactory.forName(prefix + "-consumer"));
+ this.consumersExecutor = JCPPExecutors.newVirtualThreadPool(prefix + "-consumer-virtual");
this.mgmtExecutor = JCPPExecutors.newWorkStealingPool(getMgmtThreadPoolSize(), prefix + "-mgmt");
this.scheduler = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName(prefix + "-consumer-scheduler"));
}
diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java
index e7ab6d0..0316d58 100644
--- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java
+++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java
@@ -35,16 +35,12 @@ import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.stream.Collectors;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
+import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
/**
@@ -101,6 +97,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
}
+ @Override
@PreDestroy
public void destroy() {
super.destroy();
@@ -132,80 +129,109 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
Future> packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() ->
- orderedMsgList.forEach((element) -> {
+ orderedMsgList.forEach(element -> {
+
UUID id = element.getUuid();
+
ProtoQueueMsg msg = element.getMsg();
+
tracer(msg);
- log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
+
+ log.trace("[{}] Creating PackCallback for message: {}", id, msg.getValue());
+
Callback callback = new PackCallback<>(id, ctx);
+
try {
UplinkQueueMessage uplinkQueueMsg = msg.getValue();
- pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg);
- if (uplinkQueueMsg.hasLoginRequest()) {
- pileProtocolService.pileLogin(uplinkQueueMsg, callback);
- } else if (uplinkQueueMsg.hasHeartBeatRequest()) {
- pileProtocolService.heartBeat(uplinkQueueMsg, callback);
- } else if (uplinkQueueMsg.hasVerifyPricingRequest()) {
- pileProtocolService.verifyPricing(uplinkQueueMsg, callback);
- } else if (uplinkQueueMsg.hasQueryPricingRequest()) {
- pileProtocolService.queryPricing(uplinkQueueMsg, callback);
- } else if (uplinkQueueMsg.hasGunRunStatusProto()) {
- pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback);
- } else if (uplinkQueueMsg.hasChargingProgressProto()) {
- pileProtocolService.postChargingProgress(uplinkQueueMsg, callback);
- } else if (uplinkQueueMsg.hasSetPricingResponse()) {
- pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback);
- } else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) {
- pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback);
- } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) {
- pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback);
- } else if(uplinkQueueMsg.hasTransactionRecord()){
- pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback);
- }else {
- callback.onSuccess();
- }
if (statsEnabled) {
stats.log(uplinkQueueMsg);
}
+
+ pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg);
+
+ if (uplinkQueueMsg.hasLoginRequest()) {
+
+ pileProtocolService.pileLogin(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasHeartBeatRequest()) {
+
+ pileProtocolService.heartBeat(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasVerifyPricingRequest()) {
+
+ pileProtocolService.verifyPricing(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasQueryPricingRequest()) {
+
+ pileProtocolService.queryPricing(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasGunRunStatusProto()) {
+
+ pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasChargingProgressProto()) {
+
+ pileProtocolService.postChargingProgress(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasSetPricingResponse()) {
+
+ pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) {
+
+ pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) {
+
+ pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback);
+
+ } else if (uplinkQueueMsg.hasTransactionRecord()) {
+
+ pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback);
+
+ } else {
+
+ callback.onSuccess();
+ }
+
} catch (Throwable e) {
+
log.warn("[{}] Failed to process message: {}", id, msg, e);
+
callback.onFailure(e);
}
}))
);
- if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
+
+ if (!ctx.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
+
if (!packSubmitFuture.isDone()) {
+
packSubmitFuture.cancel(true);
+
UplinkQueueMessage lastSubmitMsg = pendingMsgHolder.getUplinkQueueMessage();
+
log.warn("Timeout to process message: {}", lastSubmitMsg);
}
+
if (log.isDebugEnabled()) {
- ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
+
+ ctx.getAckMap().forEach((id, msg) -> log.info("[{}] Timeout to process message: {}", id, msg.getValue()));
+
}
+
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
+
consumer.commit();
}
private void tracer(ProtoQueueMsg msg) {
- Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID))
- .map(tracerId -> {
- String origin = null;
- byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN);
- if (tracerOrigin != null) {
- origin = ByteUtil.bytesToString(tracerOrigin);
- }
- long ts = System.currentTimeMillis();
- byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS);
- if (tracerTs != null) {
- ts = ByteUtil.bytesToLong(tracerTs);
- }
-
- return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts);
- })
- .orElseGet(TracerContextUtil::newTracer);
+ TracerContextUtil.newTracer(ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ID)),
+ ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ORIGIN)),
+ ByteUtil.bytesToLong(msg.getHeaders().get(MSG_MD_TRACER_TS)));
MDCUtils.recordTracer();
}
@@ -231,6 +257,6 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple
@Setter
@Getter
private static class PendingMsgHolder {
- private volatile UplinkQueueMessage uplinkQueueMessage;
+ private UplinkQueueMessage uplinkQueueMessage;
}
}
\ No newline at end of file
diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java
index 509bc52..6017c93 100644
--- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java
+++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java
@@ -4,6 +4,8 @@
*/
package sanbing.jcpp.infrastructure.queue.common;
+import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*;
+
/**
* @author baigod
*/
@@ -11,5 +13,10 @@ public final class QueueConstants {
public static final String MSG_MD_PREFIX = "jcpp_";
- public static final String MSG_MD_TS = "ts";
+ public static final String MSG_MD_TRACER_ID = MSG_MD_PREFIX + JCPP_TRACER_ID;
+
+ public static final String MSG_MD_TRACER_ORIGIN = MSG_MD_PREFIX + JCPP_TRACER_ORIGIN;
+
+ public static final String MSG_MD_TRACER_TS = MSG_MD_PREFIX + JCPP_TRACER_TS;
+
}
\ No newline at end of file
diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java
index f61f905..fd06cff 100644
--- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java
+++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java
@@ -5,6 +5,7 @@
package sanbing.jcpp.infrastructure.queue.memory;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import sanbing.jcpp.infrastructure.queue.QueueMsg;
@@ -18,15 +19,16 @@ import java.util.concurrent.LinkedBlockingQueue;
public final class DefaultInMemoryStorage implements InMemoryStorage {
private final ConcurrentHashMap> storage = new ConcurrentHashMap<>();
+ @Value("${queue.in-memory.queue-capacity:100000}")
+ private int queueCapacity;
+
@Override
public void printStats() {
- if (log.isDebugEnabled()) {
- storage.forEach((topic, queue) -> {
- if (!queue.isEmpty()) {
- log.debug("[{}] Queue Size [{}]", topic, queue.size());
- }
- });
- }
+ storage.forEach((topic, queue) -> {
+ if (!queue.isEmpty()) {
+ log.info("[{}] Queue Size [{}]", topic, queue.size());
+ }
+ });
}
@Override
@@ -41,11 +43,11 @@ public final class DefaultInMemoryStorage implements InMemoryStorage {
@Override
public boolean put(String topic, QueueMsg msg) {
- return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg);
+ return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(queueCapacity)).add(msg);
}
@Override
- public List get(String topic) throws InterruptedException {
+ public List get(String topic) throws InterruptedException {
final BlockingQueue queue = storage.get(topic);
if (queue != null) {
final QueueMsg firstMsg = queue.poll();
diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java
index 82cb1ad..21719f9 100644
--- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java
+++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java
@@ -40,7 +40,7 @@ public class InMemoryAppQueueFactory implements AppQueueFactory {
return new InMemoryQueueProducer<>(storage, topic);
}
- @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")
+ @Scheduled(fixedRateString = "${queue.in-memory.stats.print-interval-ms:60000}")
private void printInMemoryStats() {
storage.printStats();
}
diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/KafkaAppQueueFactory.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/KafkaAppQueueFactory.java
index 155e15b..e884c93 100644
--- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/KafkaAppQueueFactory.java
+++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/KafkaAppQueueFactory.java
@@ -41,7 +41,6 @@ public class KafkaAppQueueFactory implements AppQueueFactory {
this.appAdmin = new KafkaAdmin(kafkaSettings, kafkaTopicConfigs.getAppConfigs());
}
-
@Override
public QueueConsumer> createProtocolUplinkMsgConsumer() {
KafkaConsumerTemplate.KafkaConsumerTemplateBuilder> consumerBuilder = KafkaConsumerTemplate.builder();
diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/async/JCPPVirtualThreadFactory.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/async/JCPPVirtualThreadFactory.java
index 2c88144..4edcc68 100644
--- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/async/JCPPVirtualThreadFactory.java
+++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/async/JCPPVirtualThreadFactory.java
@@ -4,8 +4,6 @@
*/
package sanbing.jcpp.infrastructure.util.async;
-import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
-
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
@@ -19,6 +17,6 @@ public class JCPPVirtualThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
- return Thread.ofVirtual().name(namePrefix + "-" + threadNumber.getAndIncrement()).unstarted(new TracerRunnable(r));
+ return Thread.ofVirtual().name(namePrefix + "-" + threadNumber.getAndIncrement()).unstarted(r);
}
}
\ No newline at end of file
diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java
index 7f22912..2af32f0 100644
--- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java
+++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java
@@ -34,7 +34,6 @@ public class MDCUtils {
}
return tracer.getTraceId();
-
}
public static void cleanTracer() {
diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java
index 5921c7c..589fc13 100644
--- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java
+++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java
@@ -40,7 +40,6 @@ public class TracerContextUtil {
tracer = new Tracer(traceId, origin, ts);
}
-
TRACE_ID_CONTAINER.set(tracer);
return tracer;
diff --git a/jcpp-infrastructure-util/src/test/java/sanbing/jcpp/infrastructure/util/async/JCPPExecutorsTest.java b/jcpp-infrastructure-util/src/test/java/sanbing/jcpp/infrastructure/util/async/JCPPExecutorsTest.java
new file mode 100644
index 0000000..418a630
--- /dev/null
+++ b/jcpp-infrastructure-util/src/test/java/sanbing/jcpp/infrastructure/util/async/JCPPExecutorsTest.java
@@ -0,0 +1,30 @@
+/**
+ * 抖音关注:程序员三丙
+ * 知识星球:https://t.zsxq.com/j9b21
+ */
+package sanbing.jcpp.infrastructure.util.async;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+import sanbing.jcpp.infrastructure.util.mdc.MDCUtils;
+import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
+import sanbing.jcpp.infrastructure.util.trace.TracerRunnable;
+
+import java.util.concurrent.ExecutorService;
+
+class JCPPExecutorsTest {
+
+ @Test
+ void newVirtualThreadPool() {
+ ExecutorService executorService = JCPPExecutors.newVirtualThreadPool("test-consumer-virtual");
+
+ TracerContextUtil.newTracer();
+ MDCUtils.recordTracer();
+
+ System.out.println(MDC.get("TRACE_ID"));
+
+ executorService.submit(new TracerRunnable(() -> {
+ System.out.println(MDC.get("TRACE_ID"));
+ }));
+ }
+}
\ No newline at end of file
diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java
index facf5ea..ce54453 100644
--- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java
+++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java
@@ -38,7 +38,7 @@ public class DownlinkController {
@PostMapping(value = "/onDownlink", consumes = "application/x-protobuf", produces = "application/x-protobuf")
public DeferredResult> onDownlink(@RequestBody DownlinkRestMessage downlinkMsg) {
- log.info("收到REST下行请求 {}", downlinkMsg);
+ log.debug("收到REST下行请求 {}", downlinkMsg);
final DeferredResult> response = new DeferredResult<>(onDownlinkTimeout,
ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build());
@@ -55,7 +55,7 @@ public class DownlinkController {
response.setResult(ResponseEntity.status(HttpStatus.OK).build());
} else {
- log.warn("下发报文时Session未找到 sessionId: {}", protocolSessionId);
+ log.info("下发报文时Session未找到 sessionId: {}", protocolSessionId);
response.setResult(ResponseEntity.status(HttpStatus.NOT_FOUND).body("Protocol Session not found for ID:" + protocolSessionId));
}
diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java
index cdeabf8..c770fc4 100644
--- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java
+++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java
@@ -13,8 +13,8 @@ public record ProtocolUplinkMsg(SocketAddress address, UUID id, T data, int s
@Override
public String toString() {
- if (data instanceof byte[]) {
- return ByteBufUtil.hexDump((byte[]) data);
+ if (data instanceof byte[] bytes) {
+ return ByteBufUtil.hexDump(bytes);
} else {
return data.toString();
}
diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java
index 7cd2c22..b4d7fe8 100644
--- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java
+++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java
@@ -25,10 +25,7 @@ import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
+import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
/**
* @author baigod
@@ -51,7 +48,7 @@ public abstract class Forwarder {
protected final boolean isMonolith;
protected QueueProducer> producer;
- public Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) {
+ protected Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) {
this.protocolName = protocolName;
this.partitionProvider = partitionProvider;
this.serviceInfoProvider = serviceInfoProvider;
@@ -66,12 +63,13 @@ public abstract class Forwarder {
public abstract void destroy();
protected void jcppForward(String topic, String key, UplinkQueueMessage msg, BiConsumer consumer) {
+ forwarderMessagesStats.incrementTotal();
QueueMsgHeaders headers = new DefaultQueueMsgHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
- headers.put(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()));
- headers.put(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()));
- headers.put(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()));
+ headers.put(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()));
+ headers.put(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()));
+ headers.put(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()));
TopicPartitionInfo tpi = partitionProvider.resolve(ServiceType.APP, topic, key);
producer.send(tpi, new ProtoQueueMsg<>(key, msg, headers), new QueueCallback() {
@@ -80,11 +78,13 @@ public abstract class Forwarder {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
+
log.trace("单体消息转发成功 key:{}", key);
if (consumer != null) {
consumer.accept(true, JacksonUtil.newObjectNode());
}
+ forwarderMessagesStats.incrementSuccessful();
}
@Override
@@ -92,6 +92,7 @@ public abstract class Forwarder {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
+
log.warn("单体消息转发异常", t);
if (consumer != null) {
@@ -99,6 +100,7 @@ public abstract class Forwarder {
objectNode.put(ERROR, t.getClass() + ": " + t.getMessage());
consumer.accept(true, objectNode);
}
+ forwarderMessagesStats.incrementFailed();
}
});
}
diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java
index ae1ebf3..56d97a4 100644
--- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java
+++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java
@@ -36,10 +36,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
-import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
-import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
+import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
/**
* @author baigod
@@ -150,12 +147,13 @@ public class KafkaForwarder extends Forwarder {
}
private void kafkaForward(String topic, String key, UplinkQueueMessage msg, BiConsumer consumer) throws InvalidProtocolBufferException {
+ forwarderMessagesStats.incrementTotal();
Headers headers = new RecordHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
- headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())));
- headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())));
- headers.add(new RecordHeader(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())));
+ headers.add(new RecordHeader(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())));
+ headers.add(new RecordHeader(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())));
+ headers.add(new RecordHeader(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())));
if (kafkaCfg.getEncoder() == KafkaCfg.EncoderType.json) {
@@ -177,6 +175,7 @@ public class KafkaForwarder extends Forwarder {
private void logAndDoConsumer(BiConsumer consumer, RecordMetadata metadata, Exception e, Tracer currentTracer) {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
+
log.debug("Kafka 消息转发完成, success:{}", e == null);
if (consumer != null) {
@@ -196,6 +195,9 @@ public class KafkaForwarder extends Forwarder {
if (e != null) {
objectNode.put(ERROR, e.getClass() + ": " + e.getMessage());
+ forwarderMessagesStats.incrementFailed();
+ } else {
+ forwarderMessagesStats.incrementSuccessful();
}
consumer.accept(e == null, objectNode);
diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java
index 91a6cb4..5379903 100644
--- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java
+++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java
@@ -41,8 +41,8 @@ public class TcpListener extends Listener {
}
private void tcpServerBootstrap(TcpCfg tcpCfg) throws InterruptedException {
- bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss"));
- workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker"));
+ bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss-%d"));
+ workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker-%d"));
ChannelHandlerInitializer channelHandler = ChannelHandlerInitializer.createTcpChannelHandler(tcpCfg, parameter);
diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/handler/ConnectionLimitHandler.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/handler/ConnectionLimitHandler.java
index 9508e26..6714e96 100644
--- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/handler/ConnectionLimitHandler.java
+++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/handler/ConnectionLimitHandler.java
@@ -42,14 +42,14 @@ public class ConnectionLimitHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channelGroup.add(ctx.channel());
- log.info("[{}]{} channelActive 当前连接数管道数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections);
+ log.info("[{}]{} channelActive 当前连接数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
channelGroup.remove(ctx.channel());
- log.info("[{}]{} channelInactive 当前连接数管道数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections);
+ log.info("[{}]{} channelInactive 当前连接数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections);
}
@Override
diff --git a/jcpp-protocol-bootstrap/src/main/resources/log4j2.xml b/jcpp-protocol-bootstrap/src/main/resources/log4j2.xml
index 34d9fdf..b077425 100644
--- a/jcpp-protocol-bootstrap/src/main/resources/log4j2.xml
+++ b/jcpp-protocol-bootstrap/src/main/resources/log4j2.xml
@@ -18,7 +18,8 @@
-
+
+
diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml
index fd6422f..f11cb16 100644
--- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml
+++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml
@@ -135,9 +135,6 @@ queue:
type: "${QUEUE_TYPE:kafka}"
partitions:
hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
- in_memory:
- stats:
- print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
kafka:
bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}"
ssl:
@@ -165,7 +162,7 @@ queue:
auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}"
other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}"
topic-properties:
- app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
+ app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats:
enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"
diff --git a/jcpp-protocol-yunkuaichong/READMD.me b/jcpp-protocol-yunkuaichong/READMD.md
similarity index 100%
rename from jcpp-protocol-yunkuaichong/READMD.me
rename to jcpp-protocol-yunkuaichong/READMD.md
diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java
index 87f28ba..665b7b2 100644
--- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java
+++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java
@@ -138,7 +138,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess
csTemp.writeBytes(byCheckSum);
checkSum = csTemp.readUnsignedShortLE();
checkResult = checkCrcSum(checkData, checkSum);
- log.info("云快充检验和 第二次检查: checkResult:{}, checkSum:{}", checkResult, checkSum);
+ log.debug("云快充检验和 第二次检查: checkResult:{}, checkSum:{}", checkResult, checkSum);
}
if (Boolean.FALSE.equals(checkResult.getFirst())) {
diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginAckDLCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginAckDLCmd.java
index d52e4d8..3c9a6a1 100644
--- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginAckDLCmd.java
+++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginAckDLCmd.java
@@ -4,6 +4,7 @@
*/
package sanbing.jcpp.protocol.yunkuaichong.v150;
+import cn.hutool.core.util.RandomUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
@@ -42,7 +43,7 @@ public class YunKuaiChongV150LoginAckDLCmd extends YunKuaiChongDownlinkCmdExe {
@Override
public void execute(TcpSession tcpSession, YunKuaiChongDwonlinkMessage yunKuaiChongDwonlinkMessage, ProtocolContext ctx) {
- log.info("{} 云快充1.5.0登录认证应答", tcpSession);
+ log.debug("{} 云快充1.5.0登录认证应答", tcpSession);
if (!yunKuaiChongDwonlinkMessage.getMsg().hasLoginResponse()) {
return;
@@ -95,7 +96,7 @@ public class YunKuaiChongV150LoginAckDLCmd extends YunKuaiChongDownlinkCmdExe {
log.info("{} 云快充1.5.0开始注册定时对时任务", tcpSession);
return PROTOCOL_SESSION_SCHEDULED.scheduleAtFixedRate(() ->
syncTime(tcpSession, pileCodeBytes, requestData),
- 0, 8, TimeUnit.HOURS);
+ 0, RandomUtil.randomInt(420, 480), TimeUnit.MINUTES);
}
);
}
diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginULCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginULCmd.java
index 5ef9cce..99fe876 100644
--- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginULCmd.java
+++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV150LoginULCmd.java
@@ -29,7 +29,7 @@ public class YunKuaiChongV150LoginULCmd extends YunKuaiChongUplinkCmdExe {
@Override
public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) {
- log.info("{} 云快充1.5.0登录认证请求", tcpSession);
+ log.debug("{} 云快充1.5.0登录认证请求", tcpSession);
ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();