Merge remote-tracking branch 'refs/remotes/public/master' into feature/YunKuaiChongV160

# Conflicts:
#	jcpp-app-bootstrap/src/test/resources/app-service-test.properties
This commit is contained in:
三丙
2024-10-15 09:50:01 +08:00
10 changed files with 80 additions and 27 deletions

View File

@@ -0,0 +1,33 @@
#
# 抖音关注:程序员三丙
# 知识星球https://t.zsxq.com/j9b21
#
volumes:
postgresql_data: {}
networks:
sanbing-network:
driver: bridge
name: sanbing-network
ipam:
config:
- subnet: 10.10.0.0/24
services:
postgres:
image: registry.cn-hangzhou.aliyuncs.com/sanbing/postgresql:17
restart: always
networks:
- sanbing-network
ports:
- "5432:5432"
environment:
- 'POSTGRES_DB=jcpp'
- 'POSTGRES_PASSWORD=postgres'
- 'POSTGRESQL_MAX_CONNECTIONS=1000'
- 'POSTGRESQL_DEFAULT_TRANSACTION_ISOLATION=read committed'
- 'TZ=Asia/Shanghai'
volumes:
- postgresql_data:/bitnami/postgresql
- ./schema/schema-postgres.sql:/docker-entrypoint-initdb.d/init.sql

View File

@@ -22,7 +22,7 @@ spring:
name: "${SPRING_APPLICATION_NAME:java-charge-point-server}"
datasource:
driver-class-name: "${SPRING_DRIVER_CLASS_NAME:org.postgresql.Driver}"
url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://10.102.12.102:30135/jcpp}"
url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://postgres:5432/jcpp}"
username: "${SPRING_DATASOURCE_USERNAME:postgres}"
password: "${SPRING_DATASOURCE_PASSWORD:postgres}"
hikari:
@@ -196,7 +196,7 @@ service:
topic: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_TOPIC:protocol_uplink}"
jcpp-partition: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_JCPP_PARTITION:true}" # 是否利用JCPP的分片框架
# 以下配置只有在service.type为protocol时且jcpp-partition为false时才生效
bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:10.102.12.102:9092}"
bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:kafka:9092}"
acks: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ACKS:1}"
# # 可选 protobuf推荐、json
encoder: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ENCODER:protobuf}"

View File

@@ -12,6 +12,7 @@ import sanbing.jcpp.app.dal.config.ibatis.enums.GunOptStatusEnum;
import sanbing.jcpp.app.dal.config.ibatis.enums.GunRunStatusEnum;
import sanbing.jcpp.app.dal.config.ibatis.enums.OwnerTypeEnum;
import sanbing.jcpp.app.dal.entity.Gun;
import sanbing.jcpp.app.dal.entity.Pile;
import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
import java.time.LocalDateTime;
@@ -37,9 +38,13 @@ class GunMapperIT extends AbstractTestBase {
UUID.fromString("3f3a61e9-de55-4177-9b4e-3a1d8c529890"),
UUID.fromString("cf1a8970-5aa9-4636-a76e-d6bcf98b4a07")
};
@Resource
GunMapper gunMapper;
@Resource
PileMapper pileMapper;
@Test
void curdTest() {
gunMapper.delete(Wrappers.lambdaQuery());
@@ -47,13 +52,16 @@ class GunMapperIT extends AbstractTestBase {
for (int i = 0; i < NORMAL_PILE_ID.length; i++) {
UUID pileId = NORMAL_PILE_ID[i];
UUID gunId = NORMAL_GUN_ID[i];
Pile pile = pileMapper.selectById(pileId);
Gun gun = Gun.builder()
.id(gunId)
.createdTime(LocalDateTime.now())
.additionalInfo(JacksonUtil.newObjectNode())
.gunNo("01")
.gunName("三丙的1号枪")
.gunCode("20231212000001-" + (i + 1))
.gunNo("02")
.gunName(pile.getPileName() + "的2号枪")
.gunCode(pile.getPileCode() + "-02")
.stationId(NORMAL_STATION_ID)
.pileId(pileId)
.ownerId(NORMAL_USER_ID)

View File

@@ -1,4 +1,3 @@
redis.connection.type=cluster
redis.cluster.nodes=10.102.12.101:30700,10.102.12.101:32027,10.102.12.101:30767,10.102.12.101:30250,10.102.12.101:30612,10.102.12.101:32303
spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/jcpp
service.protocols.yunkuaichongV150.listener.tcp.bind-port=0
service.protocols.yunkuaichongV160.listener.tcp.bind-port=0

View File

@@ -185,6 +185,8 @@ public class DefaultPileProtocolService implements PileProtocolService {
public void postGunRunStatus(UplinkQueueMessage uplinkQueueMessage, Callback callback) {
log.info("接收到充电桩上报的电桩状态 {}", uplinkQueueMessage);
// TODO 处理相关业务逻辑
callback.onSuccess();
}
@@ -192,6 +194,8 @@ public class DefaultPileProtocolService implements PileProtocolService {
public void postChargingProgress(UplinkQueueMessage uplinkQueueMessage, Callback callback) {
log.info("接收到充电桩上报的充电进度 {}", uplinkQueueMessage);
// TODO 处理相关业务逻辑
callback.onSuccess();
}
@@ -199,6 +203,8 @@ public class DefaultPileProtocolService implements PileProtocolService {
public void onSetPricingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) {
log.info("接收到充电桩上费率下发反馈 {}", uplinkQueueMessage);
// TODO 处理相关业务逻辑
callback.onSuccess();
}
@@ -206,6 +212,8 @@ public class DefaultPileProtocolService implements PileProtocolService {
public void onRemoteStartChargingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) {
log.info("接收到充电桩启动结果反馈 {}", uplinkQueueMessage);
// TODO 处理相关业务逻辑
callback.onSuccess();
}
@@ -213,6 +221,8 @@ public class DefaultPileProtocolService implements PileProtocolService {
public void onRemoteStopChargingResponse(UplinkQueueMessage uplinkQueueMessage, Callback callback) {
log.info("接收到充电桩停止结果反馈 {}", uplinkQueueMessage);
// TODO 处理相关业务逻辑
callback.onSuccess();
}

View File

@@ -39,12 +39,12 @@ public class HashPartitionProvider implements PartitionProvider {
@Value("${queue.partitions.hash_function_name:murmur3_128}")
private String hashFunctionName;
private final ConcurrentMap<QueueKey, String> partitionTopicsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, Integer> partitionSizesMap = new ConcurrentHashMap<>();
private final Map<QueueKey, String> partitionTopicsMap = new ConcurrentHashMap<>();
private final Map<QueueKey, Integer> partitionSizesMap = new ConcurrentHashMap<>();
private HashFunction hashFunction;
protected volatile ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
protected Map<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
@Resource
private ApplicationEventPublisher applicationEventPublisher;
@@ -118,7 +118,7 @@ public class HashPartitionProvider implements PartitionProvider {
}
});
final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
final Map<QueueKey, List<Integer>> oldPartitions = myPartitions;
myPartitions = newPartitions;
log.info("Current Server responsible partitions: {}", myPartitions);

View File

@@ -38,7 +38,7 @@ public class ShardingThreadPool {
private HashFunction hashFunction;
private final Map<Integer, ExecutorService> EXECUTOR_SERVICE_MAP = new ConcurrentHashMap<>(8);
private final Map<Integer, ExecutorService> executorServiceMap = new ConcurrentHashMap<>(8);
@PostConstruct
public void init() {
@@ -47,7 +47,7 @@ public class ShardingThreadPool {
@PreDestroy
public void destroy() {
for (ExecutorService executorService : EXECUTOR_SERVICE_MAP.values()) {
for (ExecutorService executorService : executorServiceMap.values()) {
executorService.shutdownNow();
log.info("Sharding Thread [{}] Shutdown completed.", executorService);
}
@@ -55,7 +55,7 @@ public class ShardingThreadPool {
@Scheduled(fixedDelayString = "${thread-pool.sharding.stats-print-interval-ms:10000}")
public void printStats() {
EXECUTOR_SERVICE_MAP.forEach((k, v) -> {
executorServiceMap.forEach((k, v) -> {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) v;
@@ -78,8 +78,8 @@ public class ShardingThreadPool {
public void execute(UUID hashKey, TracerRunnable runnable) {
int partition = hash(hashFunction, hashKey);
EXECUTOR_SERVICE_MAP.computeIfAbsent(partition % parallelism,
p -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("sharding-threads" + p)))
executorServiceMap.computeIfAbsent(Math.abs(partition % parallelism),
p -> Executors.newFixedThreadPool(1, JCPPThreadFactory.forName("sharding-threads-" + p)))
.execute(runnable);
}
}

View File

@@ -71,7 +71,7 @@ service:
topic: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_TOPIC:protocol_uplink}"
jcpp-partition: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_JCPP_PARTITION:true}" # 是否利用JCPP的分片框架
# 以下配置只有在service.type为protocol时且jcpp-partition为false时才生效
bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:10.102.12.102:9092}"
bootstrap-servers: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_SERVERS:kafka:9092}"
acks: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ACKS:1}"
# # 可选 protobuf推荐、json
encoder: "${PROTOCOLS_YUNKUAICHONGV150_FORWARD_KAFKA_ENCODER:protobuf}"
@@ -121,7 +121,7 @@ service:
# 应用程序服务注册中心配置
zk:
enabled: "${ZOOKEEPER_ENABLED:true}"
enabled: "${ZOOKEEPER_ENABLED:false}"
url: "${ZOOKEEPER_URL:zookeeper:2181}"
retry-interval-ms: "${ZOOKEEPER_RETRY_INTERVAL_MS:3000}"
connection-timeout-ms: "${ZOOKEEPER_CONNECTION_TIMEOUT_MS:3000}"

View File

@@ -66,7 +66,6 @@ class DownlinkControllerIT extends AbstractProtocolTestBase {
binaryHandlerConfig.getLengthFieldOffset(), binaryHandlerConfig.getLengthFieldLength(),
binaryHandlerConfig.getLengthAdjustment(), binaryHandlerConfig.getInitialBytesToStrip());
group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
@@ -80,7 +79,6 @@ class DownlinkControllerIT extends AbstractProtocolTestBase {
.addLast(new SimpleChannelInboundHandler<>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("接收到下行报文:{}", msg);
}
});
@@ -105,14 +103,19 @@ class DownlinkControllerIT extends AbstractProtocolTestBase {
// 先发送一段登录
channel.writeAndFlush(Unpooled.wrappedBuffer(HexUtil.decodeHex("6822001900012023121200001001011047562e393572313300898604d11722d0348606024E87"))).sync();
// 停一会等注册完成 todo 也可以读下行消息判断是否登录成功
Thread.sleep(1000);
// 一直检查等待会话注册完成
UUID sessionId;
while (true) {
if (sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().isPresent()) {
ProtocolSession protocolSession = sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().get();
sessionId = protocolSession.getId();
break;
}
Thread.sleep(100);
}
UUID messageId = UUID.randomUUID();
ProtocolSession protocolSession = sessionRegistryProvider.getSessionCache().asMap().values().stream().findFirst().get();
UUID sessionId = protocolSession.getId();
UUID requestId = UUID.randomUUID();
// 创建 DownlinkRestMessage 实例
String pileCode = "20231212000010";
DownlinkRestMessage downlinkMsg = DownlinkRestMessage.newBuilder()

View File

@@ -98,7 +98,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess
int seqNo = in.readUnsignedShortLE();
// 加密标志
int encrpyFlag = in.readUnsignedByte();
int encryptFlag = in.readUnsignedByte();
// 帧类型标志
int frameType = in.readUnsignedByte();
@@ -150,7 +150,7 @@ public class YunKuaiChongProtocolMessageProcessor extends ProtocolMessageProcess
message.setHead(startFlag);
message.setDataLength(dataLength);
message.setSequenceNumber(seqNo);
message.setEncryptionFlag(encrpyFlag);
message.setEncryptionFlag(encryptFlag);
message.setCmd(frameType);
message.setMsgBody(msgBody);
message.setCheckSum(checkSum);