diff --git a/docker/docker-compose.postgres.yml b/docker/docker-compose.postgres.yml new file mode 100644 index 0000000..be5ac05 --- /dev/null +++ b/docker/docker-compose.postgres.yml @@ -0,0 +1,33 @@ +# +# 抖音关注:程序员三丙 +# 知识星球:https://t.zsxq.com/j9b21 +# + +volumes: + postgresql_data: {} + +networks: + sanbing-network: + driver: bridge + name: sanbing-network + ipam: + config: + - subnet: 10.10.0.0/24 + +services: + postgres: + image: registry.cn-hangzhou.aliyuncs.com/sanbing/postgresql:17 + restart: always + networks: + - sanbing-network + ports: + - "5432:5432" + environment: + - 'POSTGRES_DB=jcpp' + - 'POSTGRES_PASSWORD=postgres' + - 'POSTGRESQL_MAX_CONNECTIONS=1000' + - 'POSTGRESQL_DEFAULT_TRANSACTION_ISOLATION=read committed' + - 'TZ=Asia/Shanghai' + volumes: + - postgresql_data:/bitnami/postgresql + - ./schema/schema-postgres.sql:/docker-entrypoint-initdb.d/init.sql \ No newline at end of file diff --git a/jcpp-app-bootstrap/src/main/resources/app-service.yml b/jcpp-app-bootstrap/src/main/resources/app-service.yml index 7eb821b..3d41a68 100644 --- a/jcpp-app-bootstrap/src/main/resources/app-service.yml +++ b/jcpp-app-bootstrap/src/main/resources/app-service.yml @@ -22,7 +22,7 @@ spring: name: "${SPRING_APPLICATION_NAME:java-charge-point-server}" datasource: driver-class-name: "${SPRING_DRIVER_CLASS_NAME:org.postgresql.Driver}" - url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://10.102.12.102:30135/jcpp}" + url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://postgres:5432/jcpp}" username: "${SPRING_DATASOURCE_USERNAME:postgres}" password: "${SPRING_DATASOURCE_PASSWORD:postgres}" hikari: @@ -196,7 +196,7 @@ service: topic: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_TOPIC:protocol_uplink}" jcpp-partition: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_JCPP_PARTITION:true}" # 是否利用JCPP的分片框架 # 以下配置只有在service.type为protocol时且jcpp-partition为false时才生效 - bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:10.102.12.102:9092}" + bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:kafka:9092}" acks: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ACKS:1}" # # 可选 protobuf(推荐)、json encoder: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ENCODER:protobuf}" diff --git a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java index 5eb5aac..af650ff 100644 --- a/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java +++ b/jcpp-app-bootstrap/src/test/java/sanbing/jcpp/app/dal/mapper/GunMapperIT.java @@ -12,6 +12,7 @@ import sanbing.jcpp.app.dal.config.ibatis.enums.GunOptStatusEnum; import sanbing.jcpp.app.dal.config.ibatis.enums.GunRunStatusEnum; import sanbing.jcpp.app.dal.config.ibatis.enums.OwnerTypeEnum; import sanbing.jcpp.app.dal.entity.Gun; +import sanbing.jcpp.app.dal.entity.Pile; import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil; import java.time.LocalDateTime; @@ -37,9 +38,13 @@ class GunMapperIT extends AbstractTestBase { UUID.fromString("3f3a61e9-de55-4177-9b4e-3a1d8c529890"), UUID.fromString("cf1a8970-5aa9-4636-a76e-d6bcf98b4a07") }; + @Resource GunMapper gunMapper; + @Resource + PileMapper pileMapper; + @Test void curdTest() { gunMapper.delete(Wrappers.lambdaQuery()); @@ -47,13 +52,16 @@ class GunMapperIT extends AbstractTestBase { for (int i = 0; i < NORMAL_PILE_ID.length; i++) { UUID pileId = NORMAL_PILE_ID[i]; UUID gunId = NORMAL_GUN_ID[i]; + + Pile pile = pileMapper.selectById(pileId); + Gun gun = Gun.builder() .id(gunId) .createdTime(LocalDateTime.now()) .additionalInfo(JacksonUtil.newObjectNode()) - .gunNo("01") - .gunName("三丙的1号枪") - .gunCode("20231212000001-" + (i + 1)) + .gunNo("02") + .gunName(pile.getPileName() + "的2号枪") + .gunCode(pile.getPileCode() + "-02") .stationId(NORMAL_STATION_ID) .pileId(pileId) .ownerId(NORMAL_USER_ID) diff --git a/jcpp-app-bootstrap/src/test/resources/app-service-test.properties b/jcpp-app-bootstrap/src/test/resources/app-service-test.properties index 75de971..0259723 100644 --- a/jcpp-app-bootstrap/src/test/resources/app-service-test.properties +++ b/jcpp-app-bootstrap/src/test/resources/app-service-test.properties @@ -1,4 +1,3 @@ -redis.connection.type=cluster -redis.cluster.nodes=10.102.12.101:30700,10.102.12.101:32027,10.102.12.101:30767,10.102.12.101:30250,10.102.12.101:30612,10.102.12.101:32303 +spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/jcpp service.protocols.yunkuaichongV150.listener.tcp.bind-port=0 service.protocols.yunkuaichongV160.listener.tcp.bind-port=0 \ No newline at end of file diff --git a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java index 87d114c..f8387d3 100644 --- a/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java +++ b/jcpp-app/src/main/java/sanbing/jcpp/app/service/impl/DefaultPileProtocolService.java @@ -185,6 +185,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void postGunRunStatus(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩上报的电桩状态 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } @@ -192,6 +194,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void postChargingProgress(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩上报的充电进度 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } @@ -199,6 +203,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void onSetPricingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩上费率下发反馈 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } @@ -206,6 +212,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void onRemoteStartChargingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩启动结果反馈 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } @@ -213,6 +221,8 @@ public class DefaultPileProtocolService implements PileProtocolService { public void onRemoteStopChargingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) { log.info("接收到充电桩停止结果反馈 {}", uplinkQueueMessage); + // TODO 处理相关业务逻辑 + callback.onSuccess(); } diff --git a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/HashPartitionProvider.java b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/HashPartitionProvider.java index 87db05b..98001bf 100644 --- a/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/HashPartitionProvider.java +++ b/jcpp-infrastructure-queue/src/main/java/sanbing/jcpp/infrastructure/queue/discovery/HashPartitionProvider.java @@ -39,12 +39,12 @@ public class HashPartitionProvider implements PartitionProvider { @Value("${queue.partitions.hash_function_name:murmur3_128}") private String hashFunctionName; - private final ConcurrentMap partitionTopicsMap = new ConcurrentHashMap<>(); - private final ConcurrentMap partitionSizesMap = new ConcurrentHashMap<>(); + private final Map partitionTopicsMap = new ConcurrentHashMap<>(); + private final Map partitionSizesMap = new ConcurrentHashMap<>(); private HashFunction hashFunction; - protected volatile ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); + protected Map> myPartitions = new ConcurrentHashMap<>(); @Resource private ApplicationEventPublisher applicationEventPublisher; @@ -118,7 +118,7 @@ public class HashPartitionProvider implements PartitionProvider { } }); - final ConcurrentMap> oldPartitions = myPartitions; + final Map> oldPartitions = myPartitions; myPartitions = newPartitions; log.info("Current Server responsible partitions: {}", myPartitions); diff --git a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java index 8b5119c..c334e7e 100644 --- a/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java +++ b/jcpp-infrastructure-util/src/main/java/sanbing/jcpp/infrastructure/util/config/ShardingThreadPool.java @@ -38,7 +38,7 @@ public class ShardingThreadPool { private HashFunction hashFunction; - private final Map EXECUTOR_SERVICE_MAP = new ConcurrentHashMap<>(8); + private final Map executorServiceMap = new ConcurrentHashMap<>(8); @PostConstruct public void init() { @@ -47,7 +47,7 @@ public class ShardingThreadPool { @PreDestroy public void destroy() { - for (ExecutorService executorService : EXECUTOR_SERVICE_MAP.values()) { + for (ExecutorService executorService : executorServiceMap.values()) { executorService.shutdownNow(); log.info("Sharding Thread [{}] Shutdown completed.", executorService); } @@ -55,7 +55,7 @@ public class ShardingThreadPool { @Scheduled(fixedDelayString = "${thread-pool.sharding.stats-print-interval-ms:10000}") public void printStats() { - EXECUTOR_SERVICE_MAP.forEach((k, v) -> { + executorServiceMap.forEach((k, v) -> { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) v; @@ -78,8 +78,8 @@ public class ShardingThreadPool { public void execute(UUID hashKey, TracerRunnable runnable) { int partition = hash(hashFunction, hashKey); - EXECUTOR_SERVICE_MAP.computeIfAbsent(partition % parallelism, - p -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("sharding-threads" + p))) + executorServiceMap.computeIfAbsent(Math.abs(partition % parallelism), + p -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("sharding-threads-" + p))) .execute(runnable); } } \ No newline at end of file diff --git a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml index ea92de8..fd6422f 100644 --- a/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml +++ b/jcpp-protocol-bootstrap/src/main/resources/protocol-service.yml @@ -71,7 +71,7 @@ service: topic: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_TOPIC:protocol_uplink}" jcpp-partition: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_JCPP_PARTITION:true}" # 是否利用JCPP的分片框架 # 以下配置只有在service.type为protocol时且jcpp-partition为false时才生效 - bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:10.102.12.102:9092}" + bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:kafka:9092}" acks: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ACKS:1}" # # 可选 protobuf(推荐)、json encoder: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ENCODER:protobuf}" @@ -121,7 +121,7 @@ service: # 应用程序服务注册中心配置 zk: - enabled: "${ZOOKEEPER_ENABLED:true}" + enabled: "${ZOOKEEPER_ENABLED:false}" url: "${ZOOKEEPER_URL:zookeeper:2181}" retry-interval-ms: "${ZOOKEEPER_RETRY_INTERVAL_MS:3000}" connection-timeout-ms: "${ZOOKEEPER_CONNECTION_TIMEOUT_MS:3000}" diff --git a/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java b/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java index f02982c..8346abc 100644 --- a/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java +++ b/jcpp-protocol-bootstrap/src/test/java/sanbing/jcpp/protocol/adapter/DownlinkControllerIT.java @@ -66,7 +66,6 @@ class DownlinkControllerIT extends AbstractProtocolTestBase { binaryHandlerConfig.getLengthFieldOffset(), binaryHandlerConfig.getLengthFieldLength(), binaryHandlerConfig.getLengthAdjustment(), binaryHandlerConfig.getInitialBytesToStrip()); - group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) @@ -80,7 +79,6 @@ class DownlinkControllerIT extends AbstractProtocolTestBase { .addLast(new SimpleChannelInboundHandler<>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - log.info("接收到下行报文:{}", msg); } }); @@ -105,14 +103,19 @@ class DownlinkControllerIT extends AbstractProtocolTestBase { // 先发送一段登录 channel.writeAndFlush(Unpooled.wrappedBuffer(HexUtil.decodeHex("6822001900012023121200001001011047562e393572313300898604d11722d0348606024E87"))).sync(); - // 停一会等注册完成 todo 也可以读下行消息判断是否登录成功 - Thread.sleep(1000); + // 一直检查等待会话注册完成 + UUID sessionId; + while (true) { + if (sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().isPresent()) { + ProtocolSession protocolSession = sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().get(); + sessionId = protocolSession.getId(); + break; + } + Thread.sleep(100); + } UUID messageId = UUID.randomUUID(); - ProtocolSession protocolSession = sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().get(); - UUID sessionId = protocolSession.getId(); UUID requestId = UUID.randomUUID(); - // 创建 DownlinkRestMessage 实例 String pileCode = "20231212000010"; DownlinkRestMessage downlinkMsg = DownlinkRestMessage.newBuilder() diff --git a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java index e932c99..87f28ba 100644 --- a/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java +++ b/jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java @@ -98,7 +98,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess int seqNo = in.readUnsignedShortLE(); // 加密标志 - int encrpyFlag = in.readUnsignedByte(); + int encryptFlag = in.readUnsignedByte(); // 帧类型标志 int frameType = in.readUnsignedByte(); @@ -150,7 +150,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess message.setHead(startFlag); message.setDataLength(dataLength); message.setSequenceNumber(seqNo); - message.setEncryptionFlag(encrpyFlag); + message.setEncryptionFlag(encryptFlag); message.setCmd(frameType); message.setMsgBody(msgBody); message.setCheckSum(checkSum);