From 54fcb66f5ebdf4dc815656eb0362972879b533cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Mon, 14 Oct 2024 10:08:03 +0800 Subject: [PATCH 1/7] code cleanup by sonar --- .../queue/discovery/HashPartitionProvider.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/HashPartitionProvider.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/HashPartitionProvider.java index 87db05b..98001bf 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/HashPartitionProvider.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/HashPartitionProvider.java @@ -39,12 +39,12 @@ public class HashPartitionProvider implements PartitionProvider { @Value("${queue.partitions.hash_function_name:murmur3_128}") private String hashFunctionName; - private final ConcurrentMap partitionTopicsMap = new ConcurrentHashMap<>(); - private final ConcurrentMap partitionSizesMap = new ConcurrentHashMap<>(); + private final Map partitionTopicsMap = new ConcurrentHashMap<>(); + private final Map partitionSizesMap = new ConcurrentHashMap<>(); private HashFunction hashFunction; - protected volatile ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); + protected Map> myPartitions = new ConcurrentHashMap<>(); @Resource private ApplicationEventPublisher applicationEventPublisher; @@ -118,7 +118,7 @@ public class HashPartitionProvider implements PartitionProvider { } }); - final ConcurrentMap> oldPartitions = myPartitions; + final Map> oldPartitions = myPartitions; myPartitions = newPartitions; log.info("Current Server responsible partitions: {}", myPartitions); From f414cc7e131f70caad4bd168dc286ac15ec3e1d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Mon, 14 Oct 2024 15:54:55 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E6=96=B0=E5=A2=9Epostgresql=2017=20?= =?UTF-8?q?=E7=9A=84docker=20compose?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker/docker-compose.postgres.yml | 33 +++++++++++++++++++ .../src/main/resources/app-service.yml | 4 +-- .../resources/app-service-test.properties | 3 +- .../src/main/resources/protocol-service.yml | 2 +- 4 files changed, 37 insertions(+), 5 deletions(-) create mode 100644 docker/docker-compose.postgres.yml diff --git a/docker/docker-compose.postgres.yml b/docker/docker-compose.postgres.yml new file mode 100644 index 0000000..be5ac05 --- /dev/null +++ b/docker/docker-compose.postgres.yml @@ -0,0 +1,33 @@ +# +# 抖音关注:程序员三丙 +# 知识星球:https://t.zsxq.com/j9b21 +# + +volumes: + postgresql_data: {} + +networks: + sanbing-network: + driver: bridge + name: sanbing-network + ipam: + config: + - subnet: 10.10.0.0/24 + +services: + postgres: + image: registry.cn-hangzhou.aliyuncs.com/sanbing/postgresql:17 + restart: always + networks: + - sanbing-network + ports: + - "5432:5432" + environment: + - 'POSTGRES_DB=jcpp' + - 'POSTGRES_PASSWORD=postgres' + - 'POSTGRESQL_MAX_CONNECTIONS=1000' + - 'POSTGRESQL_DEFAULT_TRANSACTION_ISOLATION=read committed' + - 'TZ=Asia/Shanghai' + volumes: + - postgresql_data:/bitnami/postgresql + - ./schema/schema-postgres.sql:/docker-entrypoint-initdb.d/init.sql \ No newline at end of file diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index f09509a..c5f1cc4 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -22,7 +22,7 @@ spring: name: "${SPRING_APPLICATION_NAME:java-charge-point-server}" datasource: driver-class-name: "${SPRING_DRIVER_CLASS_NAME:org.postgresql.Driver}" - url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://10.102.12.102:30135/jcpp}" + url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://postgres:5432/jcpp}" username: "${SPRING_DATASOURCE_USERNAME:postgres}" password: "${SPRING_DATASOURCE_PASSWORD:postgres}" hikari: @@ -196,7 +196,7 @@ service: topic: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_TOPIC:protocol_uplink}" jcpp-partition: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_JCPP_PARTITION:true}" # 是否利用JCPP的分片框架 # 以下配置只有在service.type为protocol时且jcpp-partition为false时才生效 - bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:10.102.12.102:9092}" + bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:kafka:9092}" acks: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ACKS:1}" # # 可选 protobuf(推荐)、json encoder: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ENCODER:protobuf}" diff --git a/jcpp-app-bootstrap/src/test/resources/app-service-test.properties b/jcpp-app-bootstrap/src/test/resources/app-service-test.properties index b397d7b..13313a4 100644 --- a/jcpp-app-bootstrap/src/test/resources/app-service-test.properties +++ b/jcpp-app-bootstrap/src/test/resources/app-service-test.properties @@ -1,3 +1,2 @@ -redis.connection.type=cluster -redis.cluster.nodes=10.102.12.101:30700,10.102.12.101:32027,10.102.12.101:30767,10.102.12.101:30250,10.102.12.101:30612,10.102.12.101:32303 +spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/jcpp service.protocols.yunkuaichongV150.listener.tcp.bind-port=0 \ No newline at end of file diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index 284a6bb..596fa0b 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -71,7 +71,7 @@ service: topic: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_TOPIC:protocol_uplink}" jcpp-partition: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_JCPP_PARTITION:true}" # 是否利用JCPP的分片框架 # 以下配置只有在service.type为protocol时且jcpp-partition为false时才生效 - bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:10.102.12.102:9092}" + bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:kafka:9092}" acks: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ACKS:1}" # # 可选 protobuf(推荐)、json encoder: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ENCODER:protobuf}" From f6bc74c83d47ea30edd9838a563da72760a02ce9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Mon, 14 Oct 2024 16:30:25 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=B8=8B=E8=A1=8C?= =?UTF-8?q?=E9=9B=86=E6=88=90=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jcpp/app/dal/mapper/GunMapperIT.java | 14 +++++++++++--- .../protocol/adapter/DownlinkControllerIT.java | 17 ++++++++++------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java index 5eb5aac..af650ff 100644 --- a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java +++ b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java @@ -12,6 +12,7 @@ import sanbing.jcpp.app.dal.config.ibatis.enums.GunOptStatusEnum; import sanbing.jcpp.app.dal.config.ibatis.enums.GunRunStatusEnum; import sanbing.jcpp.app.dal.config.ibatis.enums.OwnerTypeEnum; import sanbing.jcpp.app.dal.entity.Gun; +import sanbing.jcpp.app.dal.entity.Pile; import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil; import java.time.LocalDateTime; @@ -37,9 +38,13 @@ class GunMapperIT extends AbstractTestBase { UUID.fromString("3f3a61e9-de55-4177-9b4e-3a1d8c529890"), UUID.fromString("cf1a8970-5aa9-4636-a76e-d6bcf98b4a07") }; + @Resource GunMapper gunMapper; + @Resource + PileMapper pileMapper; + @Test void curdTest() { gunMapper.delete(Wrappers.lambdaQuery()); @@ -47,13 +52,16 @@ class GunMapperIT extends AbstractTestBase { for (int i = 0; i < NORMAL_PILE_ID.length; i++) { UUID pileId = NORMAL_PILE_ID[i]; UUID gunId = NORMAL_GUN_ID[i]; + + Pile pile = pileMapper.selectById(pileId); + Gun gun = Gun.builder() .id(gunId) .createdTime(LocalDateTime.now()) .additionalInfo(JacksonUtil.newObjectNode()) - .gunNo("01") - .gunName("三丙的1号枪") - .gunCode("20231212000001-" + (i + 1)) + .gunNo("02") + .gunName(pile.getPileName() + "的2号枪") + .gunCode(pile.getPileCode() + "-02") .stationId(NORMAL_STATION_ID) .pileId(pileId) .ownerId(NORMAL_USER_ID) diff --git a/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java b/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java index f02982c..8346abc 100644 --- a/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java +++ b/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java @@ -66,7 +66,6 @@ class DownlinkControllerIT extends AbstractProtocolTestBase { binaryHandlerConfig.getLengthFieldOffset(), binaryHandlerConfig.getLengthFieldLength(), binaryHandlerConfig.getLengthAdjustment(), binaryHandlerConfig.getInitialBytesToStrip()); - group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) @@ -80,7 +79,6 @@ class DownlinkControllerIT extends AbstractProtocolTestBase { .addLast(new SimpleChannelInboundHandler<>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - log.info("接收到下行报文:{}", msg); } }); @@ -105,14 +103,19 @@ class DownlinkControllerIT extends AbstractProtocolTestBase { // 先发送一段登录 channel.writeAndFlush(Unpooled.wrappedBuffer(HexUtil.decodeHex("6822001900012023121200001001011047562e393572313300898604d11722d0348606024E87"))).sync(); - // 停一会等注册完成 todo 也可以读下行消息判断是否登录成功 - Thread.sleep(1000); + // 一直检查等待会话注册完成 + UUID sessionId; + while (true) { + if (sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().isPresent()) { + ProtocolSession protocolSession = sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().get(); + sessionId = protocolSession.getId(); + break; + } + Thread.sleep(100); + } UUID messageId = UUID.randomUUID(); - ProtocolSession protocolSession = sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().get(); - UUID sessionId = protocolSession.getId(); UUID requestId = UUID.randomUUID(); - // 创建 DownlinkRestMessage 实例 String pileCode = "20231212000010"; DownlinkRestMessage downlinkMsg = DownlinkRestMessage.newBuilder() From 93d8ec9b6e1b28ccf718ff57b635744403f39738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Mon, 14 Oct 2024 17:14:28 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E5=89=8D=E7=BD=AE=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E9=BB=98=E8=AE=A4=E5=85=B3=E9=97=ADzk?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index 596fa0b..2b57f32 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -84,7 +84,7 @@ service: # 应用程序服务注册中心配置 zk: - enabled: "${ZOOKEEPER_ENABLED:true}" + enabled: "${ZOOKEEPER_ENABLED:false}" url: "${ZOOKEEPER_URL:zookeeper:2181}" retry-interval-ms: "${ZOOKEEPER_RETRY_INTERVAL_MS:3000}" connection-timeout-ms: "${ZOOKEEPER_CONNECTION_TIMEOUT_MS:3000}" From 86614a5a929578a25bca819e1de707a7509c757a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Mon, 14 Oct 2024 17:30:31 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E5=91=BD?= =?UTF-8?q?=E5=90=8D=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jcpp/infrastructure/util/config/ShardingThreadPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java index 8b5119c..29b6084 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java @@ -79,7 +79,7 @@ public class ShardingThreadPool { int partition = hash(hashFunction, hashKey); EXECUTOR_SERVICE_MAP.computeIfAbsent(partition % parallelism, - p -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("sharding-threads" + p))) + p -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("sharding-threads-%d" + p))) .execute(runnable); } } \ No newline at end of file From 103f782ddf6f4e8f7b2526d54679e40d2e9c2efc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Mon, 14 Oct 2024 17:46:22 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E5=91=BD?= =?UTF-8?q?=E5=90=8D=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../infrastructure/util/config/ShardingThreadPool.java | 10 +++++----- .../v150/YunKuaiChongV15ProtocolMessageProcessor.java | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java index 29b6084..c334e7e 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java @@ -38,7 +38,7 @@ public class ShardingThreadPool { private HashFunction hashFunction; - private final Map EXECUTOR_SERVICE_MAP = new ConcurrentHashMap<>(8); + private final Map executorServiceMap = new ConcurrentHashMap<>(8); @PostConstruct public void init() { @@ -47,7 +47,7 @@ public class ShardingThreadPool { @PreDestroy public void destroy() { - for (ExecutorService executorService : EXECUTOR_SERVICE_MAP.values()) { + for (ExecutorService executorService : executorServiceMap.values()) { executorService.shutdownNow(); log.info("Sharding Thread [{}] Shutdown completed.", executorService); } @@ -55,7 +55,7 @@ public class ShardingThreadPool { @Scheduled(fixedDelayString = "${thread-pool.sharding.stats-print-interval-ms:10000}") public void printStats() { - EXECUTOR_SERVICE_MAP.forEach((k, v) -> { + executorServiceMap.forEach((k, v) -> { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) v; @@ -78,8 +78,8 @@ public class ShardingThreadPool { public void execute(UUID hashKey, TracerRunnable runnable) { int partition = hash(hashFunction, hashKey); - EXECUTOR_SERVICE_MAP.computeIfAbsent(partition % parallelism, - p -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("sharding-threads-%d" + p))) + executorServiceMap.computeIfAbsent(Math.abs(partition % parallelism), + p -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("sharding-threads-" + p))) .execute(runnable); } } \ No newline at end of file diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java index 66f4b8b..c73ad66 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/v150/YunKuaiChongV15ProtocolMessageProcessor.java @@ -102,7 +102,7 @@ public class YunKuaiChongV15ProtocolMessageProcessor extends ProtocolMessageProc int seqNo = in.readUnsignedShortLE(); // 加密标志 - int encrpyFlag = in.readUnsignedByte(); + int encryptFlag = in.readUnsignedByte(); // 帧类型标志 int frameType = in.readUnsignedByte(); @@ -154,7 +154,7 @@ public class YunKuaiChongV15ProtocolMessageProcessor extends ProtocolMessageProc message.setHead(startFlag); message.setDataLength(dataLength); message.setSequenceNumber(seqNo); - message.setEncryptionFlag(encrpyFlag); + message.setEncryptionFlag(encryptFlag); message.setCmd(frameType); message.setMsgBody(msgBody); message.setCheckSum(checkSum); From e75ac9436508c3161747e1fd8f47fcfdeb7178ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E4=B8=99?= Date: Mon, 14 Oct 2024 17:53:42 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E5=A2=9E=E5=8A=A0todo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/service/impl/DefaultPileProtocolService.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java index 87d114c..f8387d3 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java @@ -185,6 +185,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void postGunRunStatus(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩上报的电桩状态 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } @@ -192,6 +194,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void postChargingProgress(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩上报的充电进度 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } @@ -199,6 +203,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void onSetPricingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩上费率下发反馈 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } @@ -206,6 +212,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void onRemoteStartChargingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩启动结果反馈 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } @@ -213,6 +221,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void onRemoteStopChargingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩停止结果反馈 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); }