From 107a0b9b3a38340ce17dffcf089028f9a812379d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 15 Oct 2024 09:50:55 +0800 Subject: [PATCH 01/27] typo --- jcpp-protocol-yunkuaichong/{READMD.me => READMD.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename jcpp-protocol-yunkuaichong/{READMD.me => READMD.md} (100%) diff --git a/jcpp-protocol-yunkuaichong/READMD.me b/jcpp-protocol-yunkuaichong/READMD.md similarity index 100% rename from jcpp-protocol-yunkuaichong/READMD.me rename to jcpp-protocol-yunkuaichong/READMD.md From b6193d2d7a11abde0181c06b73b6da80d9794722 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 15 Oct 2024 15:35:03 +0800 Subject: [PATCH 02/27] npe --- .../java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java index af650ff..d2c7099 100644 --- a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java +++ b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java @@ -12,9 +12,9 @@ import sanbing.jcpp.app.dal.config.ibatis.enums.GunOptStatusEnum; import sanbing.jcpp.app.dal.config.ibatis.enums.GunRunStatusEnum; import sanbing.jcpp.app.dal.config.ibatis.enums.OwnerTypeEnum; import sanbing.jcpp.app.dal.entity.Gun; -import sanbing.jcpp.app.dal.entity.Pile; import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil; +import java.text.DecimalFormat; import java.time.LocalDateTime; import java.util.UUID; @@ -42,9 +42,6 @@ class GunMapperIT extends AbstractTestBase { @Resource GunMapper gunMapper; - @Resource - PileMapper pileMapper; - @Test void curdTest() { gunMapper.delete(Wrappers.lambdaQuery()); @@ -53,15 +50,13 @@ class GunMapperIT extends AbstractTestBase { UUID pileId = NORMAL_PILE_ID[i]; UUID gunId = NORMAL_GUN_ID[i]; - Pile pile = pileMapper.selectById(pileId); - Gun gun = Gun.builder() .id(gunId) .createdTime(LocalDateTime.now()) .additionalInfo(JacksonUtil.newObjectNode()) .gunNo("02") - .gunName(pile.getPileName() + "的2号枪") - .gunCode(pile.getPileCode() + "-02") + .gunName(String.format("三丙家的%d号充电桩", i + 1) + "的2号枪") + .gunCode("202312120000" + new DecimalFormat("00").format(i + 1) + "-02") .stationId(NORMAL_STATION_ID) .pileId(pileId) .ownerId(NORMAL_USER_ID) From 26588112a371dba54f4ac1a4f4e6891a45080108 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 15 Oct 2024 16:18:01 +0800 Subject: [PATCH 03/27] =?UTF-8?q?=E7=94=9F=E6=88=9010=E4=B8=87=E6=A0=B9?= =?UTF-8?q?=E6=A1=A9=E5=81=9A=E5=8E=8B=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jcpp/app/dal/mapper/PileMapperIT.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/PileMapperIT.java b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/PileMapperIT.java index 46168eb..e00a37d 100644 --- a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/PileMapperIT.java +++ b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/PileMapperIT.java @@ -17,7 +17,10 @@ import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil; import java.text.DecimalFormat; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static sanbing.jcpp.app.dal.mapper.StationMapperIT.NORMAL_STATION_ID; import static sanbing.jcpp.app.dal.mapper.UserMapperIT.NORMAL_USER_ID; @@ -68,7 +71,36 @@ class PileMapperIT extends AbstractTestBase { pileMapper.insertOrUpdate(pile); log.info("{}", pileMapper.selectById(pileId)); - } } + + @Test + void generate100KPiles() { + AtomicInteger number = new AtomicInteger(1); + for (int i = 0; i < 100; i++) { + List piles = new ArrayList<>(); + for (int j = 0; j < 1000; j++, number.incrementAndGet()) { + Pile pile = Pile.builder() + .id(UUID.randomUUID()) + .createdTime(LocalDateTime.now()) + .additionalInfo(JacksonUtil.newObjectNode()) + .pileName(String.format("三丙家的%d号充电桩", number.get())) + .pileCode("20241015" + new DecimalFormat("000000").format(number.get())) + .protocol("yunkuaichongV150") + .stationId(NORMAL_STATION_ID) + .ownerId(NORMAL_USER_ID) + .ownerType(OwnerTypeEnum.C) + .brand("星星") + .model("10A") + .manufacturer("星星") + .status(PileStatusEnum.IDLE) + .type(PileTypeEnum.AC) + .build(); + piles.add(pile); + } + pileMapper.insert(piles); + } + + log.info("{}", pileMapper.selectCount(Wrappers.lambdaQuery())); + } } \ No newline at end of file From 2780298b60d0e284571ac55a16d3597365162edb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 15 Oct 2024 17:40:45 +0800 Subject: [PATCH 04/27] cleanup --- .../java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java index cdeabf8..c770fc4 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/domain/ProtocolUplinkMsg.java @@ -13,8 +13,8 @@ public record ProtocolUplinkMsg(SocketAddress address, UUID id, T data, int s @Override public String toString() { - if (data instanceof byte[]) { - return ByteBufUtil.hexDump((byte[]) data); + if (data instanceof byte[] bytes) { + return ByteBufUtil.hexDump(bytes); } else { return data.toString(); } From 72baea61e50df881b943409e2e39eaff575fdf2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 09:37:36 +0800 Subject: [PATCH 05/27] =?UTF-8?q?=E4=BF=AE=E6=94=B9kafka=20topic=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/app-service.yml | 2 +- .../ProtocolUplinkConsumerService.java | 18 +++++++++++------- .../src/main/resources/protocol-service.yml | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index c5f1cc4..9bb5108 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -93,7 +93,7 @@ queue: auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}" topic-properties: - app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" consumer-stats: enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}" diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java index e7ab6d0..c3c4014 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java @@ -101,6 +101,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple } + @Override @PreDestroy public void destroy() { super.destroy(); @@ -132,7 +133,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); PendingMsgHolder pendingMsgHolder = new PendingMsgHolder(); Future packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() -> - orderedMsgList.forEach((element) -> { + orderedMsgList.forEach(element -> { UUID id = element.getUuid(); ProtoQueueMsg msg = element.getMsg(); tracer(msg); @@ -157,11 +158,11 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback); } else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) { pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { + } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback); - } else if(uplinkQueueMsg.hasTransactionRecord()){ + } else if (uplinkQueueMsg.hasTransactionRecord()) { pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback); - }else { + } else { callback.onSuccess(); } @@ -189,7 +190,7 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple } private void tracer(ProtoQueueMsg msg) { - Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID)) + if (Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID)) .map(tracerId -> { String origin = null; byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN); @@ -205,7 +206,10 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts); }) - .orElseGet(TracerContextUtil::newTracer); + .isEmpty()) { + + TracerContextUtil.newTracer(); + } MDCUtils.recordTracer(); } @@ -231,6 +235,6 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple @Setter @Getter private static class PendingMsgHolder { - private volatile UplinkQueueMessage uplinkQueueMessage; + private UplinkQueueMessage uplinkQueueMessage; } } \ No newline at end of file diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index 2b57f32..f417977 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -128,7 +128,7 @@ queue: auto-offset-reset: "${QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" other-inline: "${QUEUE_KAFKA_OTHER_PROPERTIES:}" topic-properties: - app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + app: "${QUEUE_KAFKA_APP_TOPIC_PROPERTIES:retention.ms:86400000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" consumer-stats: enabled: "${QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" print-interval-ms: "${QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}" From 65084e3269dd961ace3ba5c405687d29cf291c96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 09:44:46 +0800 Subject: [PATCH 06/27] =?UTF-8?q?=E4=BF=AE=E6=94=B9log4j2=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-app-bootstrap/src/main/resources/log4j2.xml | 3 ++- jcpp-protocol-bootstrap/src/main/resources/log4j2.xml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/jcpp-app-bootstrap/src/main/resources/log4j2.xml b/jcpp-app-bootstrap/src/main/resources/log4j2.xml index 082044b..f53a4fd 100644 --- a/jcpp-app-bootstrap/src/main/resources/log4j2.xml +++ b/jcpp-app-bootstrap/src/main/resources/log4j2.xml @@ -18,7 +18,8 @@ - + + diff --git a/jcpp-protocol-bootstrap/src/main/resources/log4j2.xml b/jcpp-protocol-bootstrap/src/main/resources/log4j2.xml index 34d9fdf..b077425 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/log4j2.xml +++ b/jcpp-protocol-bootstrap/src/main/resources/log4j2.xml @@ -18,7 +18,8 @@ - + + From 0122ecf7cd863bcf80589a3e37798763ea4b56fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 09:53:19 +0800 Subject: [PATCH 07/27] =?UTF-8?q?=E6=94=BE=E5=BC=80memory=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=8C=87=E6=A0=87=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../queue/memory/DefaultInMemoryStorage.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java index f61f905..fb02920 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java @@ -20,13 +20,11 @@ public final class DefaultInMemoryStorage implements InMemoryStorage { @Override public void printStats() { - if (log.isDebugEnabled()) { - storage.forEach((topic, queue) -> { - if (!queue.isEmpty()) { - log.debug("[{}] Queue Size [{}]", topic, queue.size()); - } - }); - } + storage.forEach((topic, queue) -> { + if (!queue.isEmpty()) { + log.info("[{}] Queue Size [{}]", topic, queue.size()); + } + }); } @Override @@ -45,7 +43,7 @@ public final class DefaultInMemoryStorage implements InMemoryStorage { } @Override - public List get(String topic) throws InterruptedException { + public List get(String topic) throws InterruptedException { final BlockingQueue queue = storage.get(topic); if (queue != null) { final QueueMsg firstMsg = queue.poll(); From 3741273f9b542d88ff8108a36b91304ab68d6a75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 10:09:35 +0800 Subject: [PATCH 08/27] =?UTF-8?q?=E6=B6=88=E8=B4=B9=E5=A4=84=E7=90=86?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=9B=B4=E6=8D=A2=E4=B8=BA=E8=99=9A?= =?UTF-8?q?=E6=8B=9F=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sanbing/jcpp/app/service/queue/AbstractConsumerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/AbstractConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/AbstractConsumerService.java index cff3ebd..0b91f94 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/AbstractConsumerService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/AbstractConsumerService.java @@ -31,7 +31,7 @@ public abstract class AbstractConsumerService extends JCPPApplicationEventListen protected ScheduledExecutorService scheduler; public void init(String prefix) { - this.consumersExecutor = Executors.newCachedThreadPool(JCPPThreadFactory.forName(prefix + "-consumer")); + this.consumersExecutor = JCPPExecutors.newVirtualThreadPool(prefix + "-consumer-virtual"); this.mgmtExecutor = JCPPExecutors.newWorkStealingPool(getMgmtThreadPoolSize(), prefix + "-mgmt"); this.scheduler = Executors.newSingleThreadScheduledExecutor(JCPPThreadFactory.forName(prefix + "-consumer-scheduler")); } From a90c94a40c55b2271bdfc2df5f6ad315f305b74e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 10:20:35 +0800 Subject: [PATCH 09/27] =?UTF-8?q?=E6=B6=88=E8=B4=B9=E5=A4=84=E7=90=86?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=9B=B4=E6=8D=A2=E4=B8=BA=E8=99=9A?= =?UTF-8?q?=E6=8B=9F=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../util/async/JCPPVirtualThreadFactory.java | 4 +-- .../util/async/JCPPExecutorsTest.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) create mode 100644 jcpp-infrastructure-util/src/test/java/sanbing/jcpp/infrastructure/util/async/JCPPExecutorsTest.java diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/async/JCPPVirtualThreadFactory.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/async/JCPPVirtualThreadFactory.java index 2c88144..4edcc68 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/async/JCPPVirtualThreadFactory.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/async/JCPPVirtualThreadFactory.java @@ -4,8 +4,6 @@ */ package sanbing.jcpp.infrastructure.util.async; -import sanbing.jcpp.infrastructure.util.trace.TracerRunnable; - import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; @@ -19,6 +17,6 @@ public class JCPPVirtualThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { - return Thread.ofVirtual().name(namePrefix + "-" + threadNumber.getAndIncrement()).unstarted(new TracerRunnable(r)); + return Thread.ofVirtual().name(namePrefix + "-" + threadNumber.getAndIncrement()).unstarted(r); } } \ No newline at end of file diff --git a/jcpp-infrastructure-util/src/test/java/sanbing/jcpp/infrastructure/util/async/JCPPExecutorsTest.java b/jcpp-infrastructure-util/src/test/java/sanbing/jcpp/infrastructure/util/async/JCPPExecutorsTest.java new file mode 100644 index 0000000..418a630 --- /dev/null +++ b/jcpp-infrastructure-util/src/test/java/sanbing/jcpp/infrastructure/util/async/JCPPExecutorsTest.java @@ -0,0 +1,30 @@ +/** + * 抖音关注:程序员三丙 + * 知识星球:https://t.zsxq.com/j9b21 + */ +package sanbing.jcpp.infrastructure.util.async; + +import org.junit.jupiter.api.Test; +import org.slf4j.MDC; +import sanbing.jcpp.infrastructure.util.mdc.MDCUtils; +import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil; +import sanbing.jcpp.infrastructure.util.trace.TracerRunnable; + +import java.util.concurrent.ExecutorService; + +class JCPPExecutorsTest { + + @Test + void newVirtualThreadPool() { + ExecutorService executorService = JCPPExecutors.newVirtualThreadPool("test-consumer-virtual"); + + TracerContextUtil.newTracer(); + MDCUtils.recordTracer(); + + System.out.println(MDC.get("TRACE_ID")); + + executorService.submit(new TracerRunnable(() -> { + System.out.println(MDC.get("TRACE_ID")); + })); + } +} \ No newline at end of file From 74eeb256e274fb125776d4dd63cd06fb278bf4b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 10:57:47 +0800 Subject: [PATCH 10/27] =?UTF-8?q?=E9=99=90=E5=AE=9A=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E9=98=9F=E5=88=97=E5=AE=B9=E9=87=8F=E4=B8=8A=E9=99=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../infrastructure/queue/memory/DefaultInMemoryStorage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java index fb02920..05a42ba 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java @@ -39,7 +39,7 @@ public final class DefaultInMemoryStorage implements InMemoryStorage { @Override public boolean put(String topic, QueueMsg msg) { - return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg); + return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(100000)).add(msg); } @Override From 5b6583defa44f0adf2120431fae28581362f25fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 11:09:40 +0800 Subject: [PATCH 11/27] =?UTF-8?q?NIO=E7=BA=BF=E7=A8=8B=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java index 91a6cb4..5379903 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/TcpListener.java @@ -41,8 +41,8 @@ public class TcpListener extends Listener { } private void tcpServerBootstrap(TcpCfg tcpCfg) throws InterruptedException { - bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss")); - workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker")); + bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss-%d")); + workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker-%d")); ChannelHandlerInitializer channelHandler = ChannelHandlerInitializer.createTcpChannelHandler(tcpCfg, parameter); From e4921faf80d2f685bd2deeced20009bc6d99fe7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 11:36:20 +0800 Subject: [PATCH 12/27] =?UTF-8?q?=E6=97=A5=E5=BF=97=E9=99=8D=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-app-bootstrap/src/main/resources/app-service.yml | 6 +++--- .../v150/YunKuaiChongV15ProtocolMessageProcessor.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 9bb5108..8d93a32 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -127,12 +127,12 @@ redis: standalone: host: "${REDIS_HOST:redis}" port: "${REDIS_PORT:6379}" - useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:true}" + useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:false}" clientName: "${REDIS_CLIENT_NAME:standalone}" commandTimeout: "${REDIS_CLIENT_COMMAND_TIMEOUT:30000}" shutdownTimeout: "${REDIS_CLIENT_SHUTDOWN_TIMEOUT:1000}" readTimeout: "${REDIS_CLIENT_READ_TIMEOUT:60000}" - usePoolConfig: "${REDIS_CLIENT_USE_POOL_CONFIG:false}" + usePoolConfig: "${REDIS_CLIENT_USE_POOL_CONFIG:true}" cluster: nodes: "${REDIS_NODES:redis-node-0:6379,redis-node-1:6379,redis-node-2:6379,redis-node-3:6379,redis-node-4:6379,redis-node-5:6379}" max-redirects: "${REDIS_MAX_REDIRECTS:12}" @@ -145,7 +145,7 @@ redis: db: "${REDIS_DB:0}" password: "${REDIS_PASSWORD:sanbing}" pool_config: - maxTotal: "${REDIS_POOL_CONFIG_MAX_TOTAL:128}" + maxTotal: "${REDIS_POOL_CONFIG_MAX_TOTAL:256}" maxIdle: "${REDIS_POOL_CONFIG_MAX_IDLE:128}" minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}" testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}" diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java index c73ad66..e1719ef 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java @@ -142,7 +142,7 @@ public class YunKuaiChongV15ProtocolMessageProcessor extends ProtocolMessageProc csTemp.writeBytes(byCheckSum); checkSum = csTemp.readUnsignedShortLE(); checkResult = checkCrcSum(checkData, checkSum); - log.info("云快充V1.5检验和 第二次检查: checkResult:{}, checkSum:{}", checkResult, checkSum); + log.debug("云快充V1.5检验和 第二次检查: checkResult:{}, checkSum:{}", checkResult, checkSum); } if (!checkResult.getFirst()) { From 8bde50009202c06a5304b113a620cbd49087ad2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 13:56:37 +0800 Subject: [PATCH 13/27] =?UTF-8?q?=E6=8F=90=E9=AB=98DB=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-app-bootstrap/src/main/resources/app-service.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 8d93a32..f8e82f0 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -27,7 +27,7 @@ spring: password: "${SPRING_DATASOURCE_PASSWORD:postgres}" hikari: leak-detection-threshold: "${SPRING_DATASOURCE_HIKARI_LEAK_DETECTION_THRESHOLD:0}" - maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:16}" + maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:32}" register-mbeans: "${SPRING_DATASOURCE_HIKARI_REGISTER_MBEANS:false}" mybatis-plus: From 95039219c9c4bcf8018c4a9eafbecf080b281163 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 13:58:55 +0800 Subject: [PATCH 14/27] =?UTF-8?q?=E6=A1=A9=E4=BF=A1=E6=81=AF=E7=BC=93?= =?UTF-8?q?=E5=AD=981=E5=A4=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-app-bootstrap/src/main/resources/app-service.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index f8e82f0..5d31b18 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -115,7 +115,7 @@ cache: type: "${CACHE_TYPE:caffeine}" # caffeine or redis specs: piles: - timeToLiveInMinutes: "${CACHE_SPECS_PILES_TTL:15}" + timeToLiveInMinutes: "${CACHE_SPECS_PILES_TTL:1440}" maxSize: "${CACHE_SPECS_PILES_MAX_SIZE:1000}" pileSessions: timeToLiveInMinutes: "${CACHE_SPECS_PILE_SESSIONS_TTL:1440}" From f707705bbb9857c65c8d2d02fc5959eac1e8eed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 13:59:17 +0800 Subject: [PATCH 15/27] =?UTF-8?q?=E6=A1=A9=E4=BF=A1=E6=81=AF=E7=BC=93?= =?UTF-8?q?=E5=AD=981=E5=A4=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-app-bootstrap/src/main/resources/app-service.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 5d31b18..058b6c1 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -116,7 +116,7 @@ cache: specs: piles: timeToLiveInMinutes: "${CACHE_SPECS_PILES_TTL:1440}" - maxSize: "${CACHE_SPECS_PILES_MAX_SIZE:1000}" + maxSize: "${CACHE_SPECS_PILES_MAX_SIZE:100000}" pileSessions: timeToLiveInMinutes: "${CACHE_SPECS_PILE_SESSIONS_TTL:1440}" maxSize: "${CACHE_SPECS_PILE_SESSIONS_MAX_SIZE:100000}" From 586c338f36f49934961e8ba857adaf0de90de98b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 14:27:10 +0800 Subject: [PATCH 16/27] =?UTF-8?q?=E8=B0=83=E6=95=B4=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-app-bootstrap/src/main/resources/app-service.yml | 4 ++-- .../infrastructure/queue/memory/DefaultInMemoryStorage.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 058b6c1..7fbfb91 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -145,8 +145,8 @@ redis: db: "${REDIS_DB:0}" password: "${REDIS_PASSWORD:sanbing}" pool_config: - maxTotal: "${REDIS_POOL_CONFIG_MAX_TOTAL:256}" - maxIdle: "${REDIS_POOL_CONFIG_MAX_IDLE:128}" + maxTotal: "${REDIS_POOL_CONFIG_MAX_TOTAL:128}" + maxIdle: "${REDIS_POOL_CONFIG_MAX_IDLE:64}" minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}" testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}" testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}" diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java index 05a42ba..347c88a 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java @@ -39,7 +39,7 @@ public final class DefaultInMemoryStorage implements InMemoryStorage { @Override public boolean put(String topic, QueueMsg msg) { - return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(100000)).add(msg); + return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(10000)).add(msg); } @Override From 7022dec3d6ec2d6e800ebc9a639d0c8ac187d0df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 14:41:47 +0800 Subject: [PATCH 17/27] log --- .../protocol/listener/tcp/handler/ConnectionLimitHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/handler/ConnectionLimitHandler.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/handler/ConnectionLimitHandler.java index 9508e26..6714e96 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/handler/ConnectionLimitHandler.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/listener/tcp/handler/ConnectionLimitHandler.java @@ -42,14 +42,14 @@ public class ConnectionLimitHandler extends ChannelInboundHandlerAdapter { public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); channelGroup.add(ctx.channel()); - log.info("[{}]{} channelActive 当前连接数管道数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections); + log.info("[{}]{} channelActive 当前连接数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); channelGroup.remove(ctx.channel()); - log.info("[{}]{} channelInactive 当前连接数管道数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections); + log.info("[{}]{} channelInactive 当前连接数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections); } @Override From 5b7d42da9ebe814faae51b0a42f05d290b204303 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 14:59:21 +0800 Subject: [PATCH 18/27] =?UTF-8?q?=E6=97=A5=E5=BF=97=E9=99=8D=E7=BA=A7=20lo?= =?UTF-8?q?g4j2=20=E6=9C=89=E9=98=BB=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jcpp/app/service/impl/DefaultDownlinkCallService.java | 2 +- .../jcpp/app/service/impl/DefaultPileProtocolService.java | 4 ++-- .../sanbing/jcpp/protocol/adapter/DownlinkController.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java index 90c1f76..3c55ada 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java @@ -49,7 +49,7 @@ public class DefaultDownlinkCallService implements DownlinkCallService { if (serviceInfoProvider.isMonolith()) { downlinkController.onDownlink(downlinkMessageBuilder.build()) - .setResultHandler(result -> log.info("下行消息发送完成")); + .setResultHandler(result -> log.debug("下行消息发送完成")); } else { try { diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java index f8387d3..e5853ae 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java @@ -48,7 +48,7 @@ public class DefaultPileProtocolService implements PileProtocolService { @Override public void pileLogin(UplinkQueueMessage uplinkQueueMessage, Callback callback) { - log.info("接收到桩登录事件 {}", uplinkQueueMessage); + log.debug("接收到桩登录事件 {}", uplinkQueueMessage); LoginRequest loginRequest = uplinkQueueMessage.getLoginRequest(); @@ -56,7 +56,7 @@ public class DefaultPileProtocolService implements PileProtocolService { String pileCode = loginRequest.getPileCode(); - log.info("查询到充电桩信息 {}", pile); + log.debug("查询到充电桩信息 {}", pile); // 构造下行回复 DownlinkRestMessage.Builder downlinkMessageBuilder = createDownlinkMessageBuilder(uplinkQueueMessage, loginRequest.getPileCode()); diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java index facf5ea..ce54453 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/adapter/DownlinkController.java @@ -38,7 +38,7 @@ public class DownlinkController { @PostMapping(value = "/onDownlink", consumes = "application/x-protobuf", produces = "application/x-protobuf") public DeferredResult> onDownlink(@RequestBody DownlinkRestMessage downlinkMsg) { - log.info("收到REST下行请求 {}", downlinkMsg); + log.debug("收到REST下行请求 {}", downlinkMsg); final DeferredResult> response = new DeferredResult<>(onDownlinkTimeout, ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build()); @@ -55,7 +55,7 @@ public class DownlinkController { response.setResult(ResponseEntity.status(HttpStatus.OK).build()); } else { - log.warn("下发报文时Session未找到 sessionId: {}", protocolSessionId); + log.info("下发报文时Session未找到 sessionId: {}", protocolSessionId); response.setResult(ResponseEntity.status(HttpStatus.NOT_FOUND).body("Protocol Session not found for ID:" + protocolSessionId)); } From 6c3b5c46bfdb75f4f814554e08a04f5242d91375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 15:21:06 +0800 Subject: [PATCH 19/27] =?UTF-8?q?=E4=B8=8D=E4=BF=9D=E5=AD=98pileId?= =?UTF-8?q?=E7=BB=B4=E5=BA=A6=E7=9A=84=E4=BC=9A=E8=AF=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/cache/session/PileSessionCacheKey.java | 13 ++----------- .../service/impl/DefaultPileProtocolService.java | 1 - 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/cache/session/PileSessionCacheKey.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/cache/session/PileSessionCacheKey.java index e18f0e4..8d6f5a6 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/cache/session/PileSessionCacheKey.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/cache/session/PileSessionCacheKey.java @@ -7,35 +7,26 @@ package sanbing.jcpp.app.service.cache.session; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import java.io.Serializable; -import java.util.Optional; -import java.util.UUID; /** * @author baigod */ @Getter @EqualsAndHashCode -@RequiredArgsConstructor @Builder public class PileSessionCacheKey implements Serializable { - private final UUID pileId; private final String pileCode; - public PileSessionCacheKey(UUID pileId) { - this(pileId, null); - } - public PileSessionCacheKey(String pileCode) { - this(null, pileCode); + this.pileCode = pileCode; } @Override public String toString() { - return Optional.ofNullable(pileId).map(UUID::toString).orElse(pileCode); + return pileCode; } } \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java index e5853ae..10fc0dd 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java @@ -108,7 +108,6 @@ public class DefaultPileProtocolService implements PileProtocolService { pileSession.setRemoteAddress(remoteAddress); pileSession.setNodeId(nodeId); pileSession.setNodeWebapiIpPort(nodeWebapiIpPort); - pileSessionCache.put(new PileSessionCacheKey(pile.getId()), pileSession); pileSessionCache.put(new PileSessionCacheKey(pile.getPileCode()), pileSession); } From 7d8a072084531ac06f2f474cfa22d9e8aa71b16a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 15:27:35 +0800 Subject: [PATCH 20/27] =?UTF-8?q?=E6=97=A5=E5=BF=97=E5=86=8D=E9=99=8D?= =?UTF-8?q?=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java | 2 +- .../yunkuaichong/v150/cmd/YunKuaiChongV150LoginULCmd.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java index d7d0b70..74874ac 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java @@ -42,7 +42,7 @@ public class YunKuaiChongV150LoginAckDLCmd extends YunKuaiChongDownlinkCmdExe { @Override public void execute(TcpSession tcpSession, YunKuaiChongDwonlinkMessage yunKuaiChongDwonlinkMessage, ProtocolContext ctx) { - log.info("{} 云快充1.5.0登录认证应答", tcpSession); + log.debug("{} 云快充1.5.0登录认证应答", tcpSession); if (!yunKuaiChongDwonlinkMessage.getMsg().hasLoginResponse()) { return; diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginULCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginULCmd.java index 0971188..63b5739 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginULCmd.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginULCmd.java @@ -29,7 +29,7 @@ public class YunKuaiChongV150LoginULCmd extends YunKuaiChongUplinkCmdExe { @Override public void execute(TcpSession tcpSession, YunKuaiChongUplinkMessage yunKuaiChongUplinkMessage, ProtocolContext ctx) { - log.info("{} 云快充1.5.0登录认证请求", tcpSession); + log.debug("{} 云快充1.5.0登录认证请求", tcpSession); ByteBuf byteBuf = Unpooled.copiedBuffer(yunKuaiChongUplinkMessage.getMsgBody()); ObjectNode additionalInfo = JacksonUtil.newObjectNode(); From 73bc580a0a281372caa76ab8a6f385ccad80694b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Wed, 16 Oct 2024 16:15:59 +0800 Subject: [PATCH 21/27] =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=AE=B9=E9=87=8F=E5=8F=AF=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-app-bootstrap/src/main/resources/app-service.yml | 3 ++- .../infrastructure/queue/memory/DefaultInMemoryStorage.java | 6 +++++- .../queue/provider/InMemoryAppQueueFactory.java | 2 +- .../src/main/resources/protocol-service.yml | 3 --- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 7fbfb91..7662d41 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -63,7 +63,8 @@ queue: type: "${QUEUE_TYPE:memory}" partitions: hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 - in_memory: + in-memory: + queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}" stats: print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" kafka: diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java index 347c88a..fd06cff 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/memory/DefaultInMemoryStorage.java @@ -5,6 +5,7 @@ package sanbing.jcpp.infrastructure.queue.memory; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import sanbing.jcpp.infrastructure.queue.QueueMsg; @@ -18,6 +19,9 @@ import java.util.concurrent.LinkedBlockingQueue; public final class DefaultInMemoryStorage implements InMemoryStorage { private final ConcurrentHashMap> storage = new ConcurrentHashMap<>(); + @Value("${queue.in-memory.queue-capacity:100000}") + private int queueCapacity; + @Override public void printStats() { storage.forEach((topic, queue) -> { @@ -39,7 +43,7 @@ public final class DefaultInMemoryStorage implements InMemoryStorage { @Override public boolean put(String topic, QueueMsg msg) { - return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(10000)).add(msg); + return storage.computeIfAbsent(topic, t -> new LinkedBlockingQueue<>(queueCapacity)).add(msg); } @Override diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java index 82cb1ad..21719f9 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/InMemoryAppQueueFactory.java @@ -40,7 +40,7 @@ public class InMemoryAppQueueFactory implements AppQueueFactory { return new InMemoryQueueProducer<>(storage, topic); } - @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}") + @Scheduled(fixedRateString = "${queue.in-memory.stats.print-interval-ms:60000}") private void printInMemoryStats() { storage.printStats(); } diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index f417977..7fd56f0 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -98,9 +98,6 @@ queue: type: "${QUEUE_TYPE:kafka}" partitions: hash_function_name: "${QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256 - in_memory: - stats: - print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" kafka: bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}" ssl: From 2b143f10afc8e5ceab1323d0638251c41ad2a95b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Mon, 21 Oct 2024 16:44:06 +0800 Subject: [PATCH 22/27] =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=8F=AA=E8=83=BD=E7=9B=B4=E6=8E=A5=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jcpp/app/service/impl/DefaultDownlinkCallService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java index 3c55ada..a0a801d 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java @@ -6,6 +6,7 @@ package sanbing.jcpp.app.service.impl; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; @@ -44,9 +45,12 @@ public class DefaultDownlinkCallService implements DownlinkCallService { @Resource TransactionalCache pileSessionCache; + @Value("${cache.type}") + private String cacheType; + @Override public void sendDownlinkMessage(DownlinkRestMessage.Builder downlinkMessageBuilder, String pileCode) { - if (serviceInfoProvider.isMonolith()) { + if (serviceInfoProvider.isMonolith() && "caffeine".equalsIgnoreCase(cacheType)) { downlinkController.onDownlink(downlinkMessageBuilder.build()) .setResultHandler(result -> log.debug("下行消息发送完成")); From fb0834ce1d47561fe3966c21b15c76a5d3ceed9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 22 Oct 2024 10:58:31 +0800 Subject: [PATCH 23/27] =?UTF-8?q?=E4=B8=8B=E5=8F=91=E5=AF=B9=E6=97=B6?= =?UTF-8?q?=E9=9A=8F=E6=9C=BA7~8=E5=B0=8F=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java index 74874ac..cb5b1cf 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/cmd/YunKuaiChongV150LoginAckDLCmd.java @@ -4,6 +4,7 @@ */ package sanbing.jcpp.protocol.yunkuaichong.v150.cmd; +import cn.hutool.core.util.RandomUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; @@ -95,7 +96,7 @@ public class YunKuaiChongV150LoginAckDLCmd extends YunKuaiChongDownlinkCmdExe { log.info("{} 云快充1.5.0开始注册定时对时任务", tcpSession); return PROTOCOL_SESSION_SCHEDULED.scheduleAtFixedRate(() -> syncTime(tcpSession, pileCodeBytes, requestData), - 0, 8, TimeUnit.HOURS); + 0, RandomUtil.randomInt(420, 480), TimeUnit.MINUTES); } ); } From 45da17b220bbd524a5a95c7ddcb3f46a31f94d58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 22 Oct 2024 11:16:12 +0800 Subject: [PATCH 24/27] =?UTF-8?q?=E8=BD=AC=E5=8F=91=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=87=8F=E6=8C=87=E6=A0=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/sanbing/jcpp/protocol/forwarder/Forwarder.java | 7 ++++++- .../sanbing/jcpp/protocol/forwarder/KafkaForwarder.java | 5 +++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java index 7cd2c22..9aa2e9b 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java @@ -51,7 +51,7 @@ public abstract class Forwarder { protected final boolean isMonolith; protected QueueProducer> producer; - public Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) { + protected Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) { this.protocolName = protocolName; this.partitionProvider = partitionProvider; this.serviceInfoProvider = serviceInfoProvider; @@ -66,6 +66,7 @@ public abstract class Forwarder { public abstract void destroy(); protected void jcppForward(String topic, String key, UplinkQueueMessage msg, BiConsumer consumer) { + forwarderMessagesStats.incrementTotal(); QueueMsgHeaders headers = new DefaultQueueMsgHeaders(); Tracer currentTracer = TracerContextUtil.getCurrentTracer(); @@ -80,11 +81,13 @@ public abstract class Forwarder { TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs()); MDCUtils.recordTracer(); + log.trace("单体消息转发成功 key:{}", key); if (consumer != null) { consumer.accept(true, JacksonUtil.newObjectNode()); } + forwarderMessagesStats.incrementSuccessful(); } @Override @@ -92,6 +95,7 @@ public abstract class Forwarder { TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs()); MDCUtils.recordTracer(); + log.warn("单体消息转发异常", t); if (consumer != null) { @@ -99,6 +103,7 @@ public abstract class Forwarder { objectNode.put(ERROR, t.getClass() + ": " + t.getMessage()); consumer.accept(true, objectNode); } + forwarderMessagesStats.incrementFailed(); } }); } diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java index ae1ebf3..c81b929 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java @@ -150,6 +150,7 @@ public class KafkaForwarder extends Forwarder { } private void kafkaForward(String topic, String key, UplinkQueueMessage msg, BiConsumer consumer) throws InvalidProtocolBufferException { + forwarderMessagesStats.incrementTotal(); Headers headers = new RecordHeaders(); Tracer currentTracer = TracerContextUtil.getCurrentTracer(); @@ -177,6 +178,7 @@ public class KafkaForwarder extends Forwarder { private void logAndDoConsumer(BiConsumer consumer, RecordMetadata metadata, Exception e, Tracer currentTracer) { TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs()); MDCUtils.recordTracer(); + log.debug("Kafka 消息转发完成, success:{}", e == null); if (consumer != null) { @@ -196,6 +198,9 @@ public class KafkaForwarder extends Forwarder { if (e != null) { objectNode.put(ERROR, e.getClass() + ": " + e.getMessage()); + forwarderMessagesStats.incrementFailed(); + } else { + forwarderMessagesStats.incrementSuccessful(); } consumer.accept(e == null, objectNode); From 6bb3d3738f4bfd82c388e2c2086ad1c7ad642c72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 22 Oct 2024 14:38:56 +0800 Subject: [PATCH 25/27] =?UTF-8?q?=E6=8C=87=E6=A0=87=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ProtocolUplinkConsumerService.java | 101 ++++++++++++------ 1 file changed, 70 insertions(+), 31 deletions(-) diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java index c3c4014..20ff976 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java @@ -134,58 +134,100 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple PendingMsgHolder pendingMsgHolder = new PendingMsgHolder(); Future packSubmitFuture = consumersExecutor.submit(new TracerRunnable(() -> orderedMsgList.forEach(element -> { + UUID id = element.getUuid(); + ProtoQueueMsg msg = element.getMsg(); + tracer(msg); - log.trace("[{}] Creating main callback for message: {}", id, msg.getValue()); + + log.trace("[{}] Creating PackCallback for message: {}", id, msg.getValue()); + Callback callback = new PackCallback<>(id, ctx); + try { UplinkQueueMessage uplinkQueueMsg = msg.getValue(); - pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg); - if (uplinkQueueMsg.hasLoginRequest()) { - pileProtocolService.pileLogin(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasHeartBeatRequest()) { - pileProtocolService.heartBeat(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasVerifyPricingRequest()) { - pileProtocolService.verifyPricing(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasQueryPricingRequest()) { - pileProtocolService.queryPricing(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasGunRunStatusProto()) { - pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasChargingProgressProto()) { - pileProtocolService.postChargingProgress(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasSetPricingResponse()) { - pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) { - pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { - pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback); - } else if (uplinkQueueMsg.hasTransactionRecord()) { - pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback); - } else { - callback.onSuccess(); - } if (statsEnabled) { stats.log(uplinkQueueMsg); } + + pendingMsgHolder.setUplinkQueueMessage(uplinkQueueMsg); + + if (uplinkQueueMsg.hasLoginRequest()) { + + pileProtocolService.pileLogin(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasHeartBeatRequest()) { + + pileProtocolService.heartBeat(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasVerifyPricingRequest()) { + + pileProtocolService.verifyPricing(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasQueryPricingRequest()) { + + pileProtocolService.queryPricing(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasGunRunStatusProto()) { + + pileProtocolService.postGunRunStatus(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasChargingProgressProto()) { + + pileProtocolService.postChargingProgress(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasSetPricingResponse()) { + + pileProtocolService.onSetPricingResponse(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasRemoteStartChargingResponse()) { + + pileProtocolService.onRemoteStartChargingResponse(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasRemoteStopChargingResponse()) { + + pileProtocolService.onRemoteStopChargingResponse(uplinkQueueMsg, callback); + + } else if (uplinkQueueMsg.hasTransactionRecord()) { + + pileProtocolService.onTransactionRecord(uplinkQueueMsg, callback); + + } else { + + callback.onSuccess(); + } + } catch (Throwable e) { + log.warn("[{}] Failed to process message: {}", id, msg, e); + callback.onFailure(e); } })) ); - if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + + if (!ctx.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + if (!packSubmitFuture.isDone()) { + packSubmitFuture.cancel(true); + UplinkQueueMessage lastSubmitMsg = pendingMsgHolder.getUplinkQueueMessage(); + log.warn("Timeout to process message: {}", lastSubmitMsg); } + if (log.isDebugEnabled()) { - ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); + + ctx.getAckMap().forEach((id, msg) -> log.info("[{}] Timeout to process message: {}", id, msg.getValue())); + } + ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue())); } + consumer.commit(); } @@ -198,11 +240,8 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple origin = ByteUtil.bytesToString(tracerOrigin); } - long ts = System.currentTimeMillis(); byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS); - if (tracerTs != null) { - ts = ByteUtil.bytesToLong(tracerTs); - } + long ts = tracerTs != null ? ByteUtil.bytesToLong(tracerTs) : System.currentTimeMillis(); return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts); }) From 09281ca3960032a77495c456aa9f3a9602ed24c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 22 Oct 2024 15:04:37 +0800 Subject: [PATCH 26/27] =?UTF-8?q?=E6=A0=B9=E6=8D=AE=E5=8E=8B=E6=B5=8B?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E8=B0=83=E5=8F=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker/Dockerfile-App | 3 +++ docker/Dockerfile-Protocol | 2 ++ .../src/main/resources/app-service.yml | 6 ++--- .../src/main/resources/log4j2.xml | 6 +++++ .../ProtocolUplinkConsumerService.java | 25 +++---------------- .../queue/common/QueueConstants.java | 9 ++++++- .../infrastructure/util/mdc/MDCUtils.java | 1 - .../util/trace/TracerContextUtil.java | 1 - .../jcpp/protocol/forwarder/Forwarder.java | 11 +++----- .../protocol/forwarder/KafkaForwarder.java | 11 +++----- 10 files changed, 34 insertions(+), 41 deletions(-) diff --git a/docker/Dockerfile-App b/docker/Dockerfile-App index 144da60..f619b3d 100644 --- a/docker/Dockerfile-App +++ b/docker/Dockerfile-App @@ -35,6 +35,9 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin EXPOSE 8080 8080 +ENV APP_LOG_LEVEL=INFO +ENV PROTOCOLS_LOG_LEVEL=INFO + CMD ["start.sh"] diff --git a/docker/Dockerfile-Protocol b/docker/Dockerfile-Protocol index 7aa342a..82aae02 100644 --- a/docker/Dockerfile-Protocol +++ b/docker/Dockerfile-Protocol @@ -35,6 +35,8 @@ RUN chmod a+x *.sh && mv start.sh /usr/bin EXPOSE 8081 8081 +ENV PROTOCOLS_LOG_LEVEL=INFO + CMD ["start.sh"] diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 7662d41..11b7c38 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -27,7 +27,7 @@ spring: password: "${SPRING_DATASOURCE_PASSWORD:postgres}" hikari: leak-detection-threshold: "${SPRING_DATASOURCE_HIKARI_LEAK_DETECTION_THRESHOLD:0}" - maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:32}" + maximum-pool-size: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:64}" register-mbeans: "${SPRING_DATASOURCE_HIKARI_REGISTER_MBEANS:false}" mybatis-plus: @@ -66,7 +66,7 @@ queue: in-memory: queue-capacity: "${QUEUE_IN_MEMORY_QUEUE_CAPACITY:100000}" stats: - print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" + print-interval-ms: "${QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:10000}" kafka: bootstrap-servers: "${KAFKA_SERVERS:kafka:9092}" ssl: @@ -128,7 +128,7 @@ redis: standalone: host: "${REDIS_HOST:redis}" port: "${REDIS_PORT:6379}" - useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:false}" + useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:true}" clientName: "${REDIS_CLIENT_NAME:standalone}" commandTimeout: "${REDIS_CLIENT_COMMAND_TIMEOUT:30000}" shutdownTimeout: "${REDIS_CLIENT_SHUTDOWN_TIMEOUT:1000}" diff --git a/jcpp-app-bootstrap/src/main/resources/log4j2.xml b/jcpp-app-bootstrap/src/main/resources/log4j2.xml index f53a4fd..d5884d8 100644 --- a/jcpp-app-bootstrap/src/main/resources/log4j2.xml +++ b/jcpp-app-bootstrap/src/main/resources/log4j2.xml @@ -46,6 +46,12 @@ + + + + + diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java index 20ff976..0316d58 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/queue/consumer/ProtocolUplinkConsumerService.java @@ -35,16 +35,12 @@ import sanbing.jcpp.infrastructure.util.trace.TracerRunnable; import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.*; import java.util.stream.Collectors; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN; +import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*; /** @@ -232,23 +228,10 @@ public class ProtocolUplinkConsumerService extends AbstractConsumerService imple } private void tracer(ProtoQueueMsg msg) { - if (Optional.ofNullable(msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ID)) - .map(tracerId -> { - String origin = null; - byte[] tracerOrigin = msg.getHeaders().get(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN); - if (tracerOrigin != null) { - origin = ByteUtil.bytesToString(tracerOrigin); - } - byte[] tracerTs = msg.getHeaders().get(MSG_MD_PREFIX + MSG_MD_TS); - long ts = tracerTs != null ? ByteUtil.bytesToLong(tracerTs) : System.currentTimeMillis(); - - return TracerContextUtil.newTracer(ByteUtil.bytesToString(tracerId), origin, ts); - }) - .isEmpty()) { - - TracerContextUtil.newTracer(); - } + TracerContextUtil.newTracer(ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ID)), + ByteUtil.bytesToString(msg.getHeaders().get(MSG_MD_TRACER_ORIGIN)), + ByteUtil.bytesToLong(msg.getHeaders().get(MSG_MD_TRACER_TS))); MDCUtils.recordTracer(); } diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java index 509bc52..6017c93 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/common/QueueConstants.java @@ -4,6 +4,8 @@ */ package sanbing.jcpp.infrastructure.queue.common; +import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.*; + /** * @author baigod */ @@ -11,5 +13,10 @@ public final class QueueConstants { public static final String MSG_MD_PREFIX = "jcpp_"; - public static final String MSG_MD_TS = "ts"; + public static final String MSG_MD_TRACER_ID = MSG_MD_PREFIX + JCPP_TRACER_ID; + + public static final String MSG_MD_TRACER_ORIGIN = MSG_MD_PREFIX + JCPP_TRACER_ORIGIN; + + public static final String MSG_MD_TRACER_TS = MSG_MD_PREFIX + JCPP_TRACER_TS; + } \ No newline at end of file diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java index 7f22912..2af32f0 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/mdc/MDCUtils.java @@ -34,7 +34,6 @@ public class MDCUtils { } return tracer.getTraceId(); - } public static void cleanTracer() { diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java index 5921c7c..589fc13 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/trace/TracerContextUtil.java @@ -40,7 +40,6 @@ public class TracerContextUtil { tracer = new Tracer(traceId, origin, ts); } - TRACE_ID_CONTAINER.set(tracer); return tracer; diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java index 9aa2e9b..b4d7fe8 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/Forwarder.java @@ -25,10 +25,7 @@ import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN; +import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*; /** * @author baigod @@ -70,9 +67,9 @@ public abstract class Forwarder { QueueMsgHeaders headers = new DefaultQueueMsgHeaders(); Tracer currentTracer = TracerContextUtil.getCurrentTracer(); - headers.put(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())); - headers.put(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())); - headers.put(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())); + headers.put(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())); + headers.put(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())); + headers.put(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())); TopicPartitionInfo tpi = partitionProvider.resolve(ServiceType.APP, topic, key); producer.send(tpi, new ProtoQueueMsg<>(key, msg, headers), new QueueCallback() { diff --git a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java index c81b929..56d97a4 100644 --- a/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java +++ b/jcpp-protocol-api/src/main/java/sanbing/jcpp/protocol/forwarder/KafkaForwarder.java @@ -36,10 +36,7 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX; -import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID; -import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN; +import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*; /** * @author baigod @@ -154,9 +151,9 @@ public class KafkaForwarder extends Forwarder { Headers headers = new RecordHeaders(); Tracer currentTracer = TracerContextUtil.getCurrentTracer(); - headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()))); - headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()))); - headers.add(new RecordHeader(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()))); + headers.add(new RecordHeader(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()))); + headers.add(new RecordHeader(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()))); + headers.add(new RecordHeader(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()))); if (kafkaCfg.getEncoder() == KafkaCfg.EncoderType.json) { From 8d125f4e19efdd9479de21e4e368955dae223d89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Tue, 22 Oct 2024 16:18:50 +0800 Subject: [PATCH 27/27] =?UTF-8?q?=E6=A0=B9=E6=8D=AE=E5=8E=8B=E6=B5=8B?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E8=B0=83=E5=8F=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jcpp/app/service/impl/DefaultDownlinkCallService.java | 2 +- .../infrastructure/queue/provider/KafkaAppQueueFactory.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java index a0a801d..62a6b87 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultDownlinkCallService.java @@ -87,7 +87,7 @@ public class DefaultDownlinkCallService implements DownlinkCallService { try { ResponseEntity response = downlinkRestTemplate.postForEntity("http://" + nodeWebapiIpPort + "/api/onDownlink", entity, ResponseEntity.class); - log.info("下行消息发送成功 {}", response); + log.debug("下行消息发送成功 {}", response); } catch (RestClientException e) { log.error("下行消息发送失败 {}", downlinkRestMessage, e); throw new RuntimeException(e); diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/KafkaAppQueueFactory.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/KafkaAppQueueFactory.java index 155e15b..e884c93 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/KafkaAppQueueFactory.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/provider/KafkaAppQueueFactory.java @@ -41,7 +41,6 @@ public class KafkaAppQueueFactory implements AppQueueFactory { this.appAdmin = new KafkaAdmin(kafkaSettings, kafkaTopicConfigs.getAppConfigs()); } - @Override public QueueConsumer> createProtocolUplinkMsgConsumer() { KafkaConsumerTemplate.KafkaConsumerTemplateBuilder> consumerBuilder = KafkaConsumerTemplate.builder();