Merge remote-tracking branch 'refs/remotes/public/develop' into feature/YunKuaiChongV160

# Conflicts:
#	jcpp-protocol-yunkuaichong/src/main/java/sanbing/jcpp/protocol/yunkuaichong/YunKuaiChongProtocolMessageProcessor.java
This commit is contained in:
三丙
2024-10-22 16:25:12 +08:00
31 changed files with 232 additions and 135 deletions

View File

@@ -38,7 +38,7 @@ public class DownlinkController {
@PostMapping(value = "/onDownlink", consumes = "application/x-protobuf", produces = "application/x-protobuf")
public DeferredResult<ResponseEntity<String>> onDownlink(@RequestBody DownlinkRestMessage downlinkMsg) {
log.info("收到REST下行请求 {}", downlinkMsg);
log.debug("收到REST下行请求 {}", downlinkMsg);
final DeferredResult<ResponseEntity<String>> response = new DeferredResult<>(onDownlinkTimeout,
ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build());
@@ -55,7 +55,7 @@ public class DownlinkController {
response.setResult(ResponseEntity.status(HttpStatus.OK).build());
} else {
log.warn("下发报文时Session未找到 sessionId: {}", protocolSessionId);
log.info("下发报文时Session未找到 sessionId: {}", protocolSessionId);
response.setResult(ResponseEntity.status(HttpStatus.NOT_FOUND).body("Protocol Session not found for ID:" + protocolSessionId));
}

View File

@@ -13,8 +13,8 @@ public record ProtocolUplinkMsg<T>(SocketAddress address, UUID id, T data, int s
@Override
public String toString() {
if (data instanceof byte[]) {
return ByteBufUtil.hexDump((byte[]) data);
if (data instanceof byte[] bytes) {
return ByteBufUtil.hexDump(bytes);
} else {
return data.toString();
}

View File

@@ -25,10 +25,7 @@ import sanbing.jcpp.proto.gen.ProtocolProto.UplinkQueueMessage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
/**
* @author baigod
@@ -51,7 +48,7 @@ public abstract class Forwarder {
protected final boolean isMonolith;
protected QueueProducer<ProtoQueueMsg<UplinkQueueMessage>> producer;
public Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) {
protected Forwarder(String protocolName, StatsFactory statsFactory, PartitionProvider partitionProvider, ServiceInfoProvider serviceInfoProvider) {
this.protocolName = protocolName;
this.partitionProvider = partitionProvider;
this.serviceInfoProvider = serviceInfoProvider;
@@ -66,12 +63,13 @@ public abstract class Forwarder {
public abstract void destroy();
protected void jcppForward(String topic, String key, UplinkQueueMessage msg, BiConsumer<Boolean, ObjectNode> consumer) {
forwarderMessagesStats.incrementTotal();
QueueMsgHeaders headers = new DefaultQueueMsgHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
headers.put(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()));
headers.put(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()));
headers.put(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()));
headers.put(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId()));
headers.put(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin()));
headers.put(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs()));
TopicPartitionInfo tpi = partitionProvider.resolve(ServiceType.APP, topic, key);
producer.send(tpi, new ProtoQueueMsg<>(key, msg, headers), new QueueCallback() {
@@ -80,11 +78,13 @@ public abstract class Forwarder {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
log.trace("单体消息转发成功 key:{}", key);
if (consumer != null) {
consumer.accept(true, JacksonUtil.newObjectNode());
}
forwarderMessagesStats.incrementSuccessful();
}
@Override
@@ -92,6 +92,7 @@ public abstract class Forwarder {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
log.warn("单体消息转发异常", t);
if (consumer != null) {
@@ -99,6 +100,7 @@ public abstract class Forwarder {
objectNode.put(ERROR, t.getClass() + ": " + t.getMessage());
consumer.accept(true, objectNode);
}
forwarderMessagesStats.incrementFailed();
}
});
}

View File

@@ -36,10 +36,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_PREFIX;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.MSG_MD_TS;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ID;
import static sanbing.jcpp.infrastructure.util.trace.TracerContextUtil.JCPP_TRACER_ORIGIN;
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
/**
* @author baigod
@@ -150,12 +147,13 @@ public class KafkaForwarder extends Forwarder {
}
private void kafkaForward(String topic, String key, UplinkQueueMessage msg, BiConsumer<Boolean, ObjectNode> consumer) throws InvalidProtocolBufferException {
forwarderMessagesStats.incrementTotal();
Headers headers = new RecordHeaders();
Tracer currentTracer = TracerContextUtil.getCurrentTracer();
headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())));
headers.add(new RecordHeader(MSG_MD_PREFIX + JCPP_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())));
headers.add(new RecordHeader(MSG_MD_PREFIX + MSG_MD_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())));
headers.add(new RecordHeader(MSG_MD_TRACER_ID, ByteUtil.stringToBytes(currentTracer.getTraceId())));
headers.add(new RecordHeader(MSG_MD_TRACER_ORIGIN, ByteUtil.stringToBytes(currentTracer.getOrigin())));
headers.add(new RecordHeader(MSG_MD_TRACER_TS, ByteUtil.longToBytes(currentTracer.getTracerTs())));
if (kafkaCfg.getEncoder() == KafkaCfg.EncoderType.json) {
@@ -177,6 +175,7 @@ public class KafkaForwarder extends Forwarder {
private void logAndDoConsumer(BiConsumer<Boolean, ObjectNode> consumer, RecordMetadata metadata, Exception e, Tracer currentTracer) {
TracerContextUtil.newTracer(currentTracer.getTraceId(), currentTracer.getOrigin(), currentTracer.getTracerTs());
MDCUtils.recordTracer();
log.debug("Kafka 消息转发完成, success:{}", e == null);
if (consumer != null) {
@@ -196,6 +195,9 @@ public class KafkaForwarder extends Forwarder {
if (e != null) {
objectNode.put(ERROR, e.getClass() + ": " + e.getMessage());
forwarderMessagesStats.incrementFailed();
} else {
forwarderMessagesStats.incrementSuccessful();
}
consumer.accept(e == null, objectNode);

View File

@@ -41,8 +41,8 @@ public class TcpListener extends Listener {
}
private void tcpServerBootstrap(TcpCfg tcpCfg) throws InterruptedException {
bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss"));
workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker"));
bossGroup = new NioEventLoopGroup(tcpCfg.getBossGroupThreadCount(), JCPPThreadFactory.forName("tcp-boss-%d"));
workerGroup = new NioEventLoopGroup(tcpCfg.getWorkerGroupThreadCount(), JCPPThreadFactory.forName("tcp-worker-%d"));
ChannelHandlerInitializer<SocketChannel> channelHandler = ChannelHandlerInitializer.createTcpChannelHandler(tcpCfg, parameter);

View File

@@ -42,14 +42,14 @@ public class ConnectionLimitHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channelGroup.add(ctx.channel());
log.info("[{}]{} channelActive 当前连接数管道数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections);
log.info("[{}]{} channelActive 当前连接数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
channelGroup.remove(ctx.channel());
log.info("[{}]{} channelInactive 当前连接数管道数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections);
log.info("[{}]{} channelInactive 当前连接数 {} / {}",protocolName, ctx.channel(), channelGroup.size(), maxConnections);
}
@Override