mirror of
https://gitee.com/san-bing/JChargePointProtocol
synced 2026-05-05 02:19:56 +08:00
* !44 comment * !39 添加下行日志打印 * !36 扩展计价领域模型 * !35 webui 初步成型 * !34 webui 初步成型
This commit is contained in:
@@ -25,7 +25,7 @@ import sanbing.jcpp.protocol.listener.tcp.TcpListener;
|
||||
import static org.springframework.boot.actuate.health.Status.UP;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class ProtocolBootstrap implements HealthIndicator {
|
||||
@@ -65,7 +65,7 @@ public abstract class ProtocolBootstrap implements HealthIndicator {
|
||||
protocolContext.getPartitionProvider(),
|
||||
protocolContext.getServiceInfoProvider());
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown Forwarder type: " + forwarderCfg.getType());
|
||||
throw new IllegalArgumentException("未知的转发器类型: " + forwarderCfg.getType());
|
||||
}
|
||||
|
||||
TcpCfg tcpCfg = protocolCfg.getListener().getTcp();
|
||||
|
||||
@@ -21,7 +21,7 @@ import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;
|
||||
import sanbing.jcpp.protocol.provider.ProtocolsConfigProvider;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Component
|
||||
@Getter
|
||||
|
||||
@@ -17,7 +17,7 @@ import sanbing.jcpp.protocol.forwarder.Forwarder;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class ProtocolMessageProcessor {
|
||||
@@ -44,25 +44,35 @@ public abstract class ProtocolMessageProcessor {
|
||||
|
||||
uplinkMsgStats.incrementFailed();
|
||||
|
||||
log.error("{} 消息处理器处理报文异常", listenerToHandlerMsg.session(), e);
|
||||
log.error("{} 上行消息处理器处理报文异常", listenerToHandlerMsg.session(), e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
protected abstract void uplinkHandle(ListenerToHandlerMsg listenerToHandlerMsg);
|
||||
|
||||
/**
|
||||
* 下行消息处理入口
|
||||
* 负责统一的异常处理和日志记录
|
||||
*/
|
||||
public void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg, MessagesStats downlinkMsgStats) throws DownlinkException {
|
||||
try {
|
||||
|
||||
downlinkHandle(sessionToHandlerMsg);
|
||||
doDownlinkHandle(sessionToHandlerMsg);
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
downlinkMsgStats.incrementFailed();
|
||||
|
||||
log.warn("下行消息处理失败,session: {}, 异常信息: {}", sessionToHandlerMsg.session(), e.getMessage(), e);
|
||||
|
||||
throw new DownlinkException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void downlinkHandle(SessionToHandlerMsg sessionToHandlerMsg);
|
||||
/**
|
||||
* 下行消息具体处理逻辑
|
||||
* 由各协议的具体实现类重写
|
||||
*/
|
||||
protected abstract void doDownlinkHandle(SessionToHandlerMsg sessionToHandlerMsg);
|
||||
}
|
||||
@@ -24,7 +24,7 @@ import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api")
|
||||
@@ -40,7 +40,7 @@ public class DownlinkController {
|
||||
|
||||
@PostMapping(value = "/onDownlink", consumes = "application/x-protobuf", produces = "application/x-protobuf")
|
||||
public DeferredResult<ResponseEntity<String>> onDownlink(@RequestBody DownlinkRequestMessage downlinkMsg) {
|
||||
log.debug("收到REST下行请求 {}", downlinkMsg);
|
||||
log.info("收到REST下行请求 {}", downlinkMsg);
|
||||
|
||||
final DeferredResult<ResponseEntity<String>> response = new DeferredResult<>(onDownlinkTimeout,
|
||||
ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build());
|
||||
|
||||
@@ -15,6 +15,7 @@ import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@@ -28,7 +29,6 @@ import sanbing.jcpp.proto.gen.ProtocolProto.*;
|
||||
import sanbing.jcpp.protocol.domain.ProtocolSession;
|
||||
import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
@@ -37,7 +37,7 @@ import static sanbing.jcpp.infrastructure.proto.ProtoConverter.toTracerProto;
|
||||
import static sanbing.jcpp.infrastructure.util.config.ThreadPoolConfiguration.JCPP_COMMON_THREAD_POOL;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
@@ -120,7 +120,7 @@ public class DownlinkGrpcService extends ProtocolInterfaceImplBase {
|
||||
TracerContextUtil.newTracer(tracerProto.getId(), tracerProto.getOrigin(), tracerProto.getTs());
|
||||
MDCUtils.recordTracer();
|
||||
|
||||
log.debug("通信层收到Grpc下行请求 {}", requestMsg);
|
||||
log.info("通信层收到Grpc下行请求 {}", requestMsg);
|
||||
|
||||
if (requestMsg.hasConnectRequestMsg()) {
|
||||
replyLock.lock();
|
||||
|
||||
@@ -12,7 +12,7 @@ import java.lang.annotation.*;
|
||||
* 通用协议命令注解
|
||||
* 所有协议的命令类都应该使用此注解
|
||||
*
|
||||
* @author sanbing
|
||||
* @author 九筒
|
||||
* @since 2024-12-16
|
||||
*/
|
||||
@Target(ElementType.TYPE)
|
||||
|
||||
@@ -10,7 +10,7 @@ import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
|
||||
@@ -51,7 +51,7 @@ public class TcpHandlerCfg {
|
||||
case TEXT -> HANDLER_MAP.put(type, JacksonUtil.treeToValue(cfgJson, TextHandlerConfiguration.class));
|
||||
case JSON -> HANDLER_MAP.put(type, JacksonUtil.treeToValue(cfgJson, JsonHandlerConfiguration.class));
|
||||
case BINARY -> HANDLER_MAP.put(type, JacksonUtil.treeToValue(cfgJson, BinaryHandlerConfiguration.class));
|
||||
default -> throw new IllegalArgumentException("Unknown TCP handler type: " + type);
|
||||
default -> throw new IllegalArgumentException("未知的TCP处理器类型: " + type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
package sanbing.jcpp.protocol.cfg.enums;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public enum TcpHandlerType {
|
||||
TEXT,
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
package sanbing.jcpp.protocol.domain;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public enum DownlinkCmdEnum {
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
|
||||
import sanbing.jcpp.protocol.forwarder.Forwarder;
|
||||
|
||||
@@ -26,7 +27,7 @@ import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Getter
|
||||
@Slf4j
|
||||
@@ -65,15 +66,46 @@ public abstract class ProtocolSession implements Closeable {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
close(SessionCloseReason.DESTRUCTION);
|
||||
close(ProtocolProto.SessionCloseReason.SESSION_CLOSE_DESTRUCTION);
|
||||
}
|
||||
|
||||
public void close(SessionCloseReason reason) {
|
||||
public void close(ProtocolProto.SessionCloseReason reason) {
|
||||
log.info("[{}] Protocol会话关闭,原因: {}", this, reason);
|
||||
|
||||
scheduledFutures.values().forEach(scheduledFuture -> scheduledFuture.cancel(true));
|
||||
scheduledFutures.clear();
|
||||
|
||||
// 转发会话关闭事件到后端
|
||||
if (forwarder != null && !pileCodeSet.isEmpty()) {
|
||||
|
||||
for (String pileCode : pileCodeSet) {
|
||||
ProtocolProto.SessionCloseEventProto sessionCloseEvent = ProtocolProto.SessionCloseEventProto.newBuilder()
|
||||
.setPileCode(pileCode)
|
||||
.setReason(reason)
|
||||
.setAdditionalInfo("Session closed: " + reason)
|
||||
.build();
|
||||
|
||||
ProtocolProto.UplinkQueueMessage uplinkQueueMessage = ProtocolProto.UplinkQueueMessage.newBuilder()
|
||||
.setMessageIdMSB(UUID.randomUUID().getMostSignificantBits())
|
||||
.setMessageIdLSB(UUID.randomUUID().getLeastSignificantBits())
|
||||
.setSessionIdMSB(id.getMostSignificantBits())
|
||||
.setSessionIdLSB(id.getLeastSignificantBits())
|
||||
.setMessageKey(pileCode + "_session_close")
|
||||
.setProtocolName(protocolName)
|
||||
.setSessionCloseEventProto(sessionCloseEvent)
|
||||
.build();
|
||||
|
||||
try {
|
||||
forwarder.sendMessage(uplinkQueueMessage);
|
||||
log.debug("[{}] 会话关闭事件已转发,桩编码: {}, 原因: {}", this, pileCode, reason);
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] 转发会话关闭事件失败,桩编码: {}", this, pileCode, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
/**
|
||||
* 开源代码,仅供学习和交流研究使用,商用请联系三丙
|
||||
* 微信:mohan_88888
|
||||
* 抖音:程序员三丙
|
||||
* 付费课程知识星球:https://t.zsxq.com/aKtXo
|
||||
*/
|
||||
package sanbing.jcpp.protocol.domain;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
*/
|
||||
public enum SessionCloseReason {
|
||||
/**
|
||||
* 自然销毁
|
||||
*/
|
||||
DESTRUCTION,
|
||||
|
||||
/**
|
||||
* 失活
|
||||
*/
|
||||
INACTIVE,
|
||||
|
||||
/**
|
||||
* 手动销毁
|
||||
*/
|
||||
MANUALLY
|
||||
}
|
||||
@@ -9,7 +9,7 @@ package sanbing.jcpp.protocol.domain;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public record SessionToHandlerMsg(DownlinkRequestMessage downlinkMsg, ProtocolSession session) {
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
/**
|
||||
* 开源代码,仅供学习和交流研究使用,商用请联系三丙
|
||||
* 微信:mohan_88888
|
||||
* 抖音:程序员三丙
|
||||
* 付费课程知识星球:https://t.zsxq.com/aKtXo
|
||||
*/
|
||||
package sanbing.jcpp.protocol.enums;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 支持的协议定义类
|
||||
* 使用常量定义而非枚举,确保注解可以使用编译时常量
|
||||
*
|
||||
* @author 九筒
|
||||
* @since 2024-12-22
|
||||
*/
|
||||
public final class SupportedProtocols {
|
||||
|
||||
private SupportedProtocols() {
|
||||
// 工具类,禁止实例化
|
||||
}
|
||||
|
||||
// ==================== 协议常量定义 ====================
|
||||
|
||||
/** 云快充协议 v1.5.0 */
|
||||
public static final String YUNKUAICHONG_V150 = "yunkuaichongV150";
|
||||
|
||||
/** 云快充协议 v1.6.0 */
|
||||
public static final String YUNKUAICHONG_V160 = "yunkuaichongV160";
|
||||
|
||||
/** 云快充协议 v1.7.0 */
|
||||
public static final String YUNKUAICHONG_V170 = "yunkuaichongV170";
|
||||
|
||||
/** 绿能协议 v3.4.0 */
|
||||
public static final String LVNENG_V340 = "lvnengV340";
|
||||
|
||||
// ==================== 协议显示名称映射 ====================
|
||||
|
||||
private static final Map<String, String> PROTOCOL_DISPLAY_NAMES = new HashMap<>();
|
||||
|
||||
static {
|
||||
// 协议ID与显示名称的映射关系,便于代码走读时对照
|
||||
PROTOCOL_DISPLAY_NAMES.put(YUNKUAICHONG_V150, "云快充 V1.5.0");
|
||||
PROTOCOL_DISPLAY_NAMES.put(YUNKUAICHONG_V160, "云快充 V1.6.0");
|
||||
PROTOCOL_DISPLAY_NAMES.put(YUNKUAICHONG_V170, "云快充 V1.7.0");
|
||||
PROTOCOL_DISPLAY_NAMES.put(LVNENG_V340, "绿能 V3.4.0");
|
||||
}
|
||||
|
||||
// ==================== 工具方法 ====================
|
||||
|
||||
/**
|
||||
* 获取所有支持的协议
|
||||
* 直接从映射表中获取,无需反射
|
||||
*/
|
||||
public static List<ProtocolInfo> getAllProtocols() {
|
||||
List<ProtocolInfo> protocols = new ArrayList<>();
|
||||
|
||||
for (Map.Entry<String, String> entry : PROTOCOL_DISPLAY_NAMES.entrySet()) {
|
||||
protocols.add(new ProtocolInfo(entry.getKey(), entry.getValue()));
|
||||
}
|
||||
|
||||
return protocols;
|
||||
}
|
||||
|
||||
/**
|
||||
* 协议信息封装类
|
||||
*/
|
||||
public record ProtocolInfo(String protocolId, String displayName) {
|
||||
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,7 @@ import sanbing.jcpp.protocol.domain.ProtocolSession;
|
||||
* 通用协议下行命令执行器接口
|
||||
*
|
||||
* @param <T> 下行消息类型
|
||||
* @author sanbing
|
||||
* @author 九筒
|
||||
* @since 2024-12-16
|
||||
*/
|
||||
public interface ProtocolDownlinkCmdExe<T> {
|
||||
|
||||
@@ -13,7 +13,7 @@ import sanbing.jcpp.protocol.domain.ProtocolSession;
|
||||
* 通用协议上行命令执行器接口
|
||||
*
|
||||
* @param <T> 上行消息类型
|
||||
* @author sanbing
|
||||
* @author 九筒
|
||||
* @since 2024-12-16
|
||||
*/
|
||||
public interface ProtocolUplinkCmdExe<T> {
|
||||
|
||||
@@ -30,18 +30,18 @@ import java.util.function.BiConsumer;
|
||||
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class Forwarder {
|
||||
protected static final String ERROR = "error";
|
||||
|
||||
AtomicBoolean healthy = new AtomicBoolean(true);
|
||||
final AtomicBoolean healthy = new AtomicBoolean(true);
|
||||
|
||||
@Getter
|
||||
private final String protocolName;
|
||||
|
||||
protected MessagesStats forwarderMessagesStats;
|
||||
protected final MessagesStats forwarderMessagesStats;
|
||||
|
||||
|
||||
protected final PartitionProvider partitionProvider;
|
||||
|
||||
@@ -41,11 +41,11 @@ import java.util.function.BiConsumer;
|
||||
import static sanbing.jcpp.infrastructure.queue.common.QueueConstants.*;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
public class KafkaForwarder extends Forwarder {
|
||||
AtomicBoolean healthy = new AtomicBoolean(true);
|
||||
final AtomicBoolean healthy = new AtomicBoolean(true);
|
||||
|
||||
private static final String OFFSET = "offset";
|
||||
private static final String PARTITION = "partition";
|
||||
|
||||
@@ -20,7 +20,7 @@ import sanbing.jcpp.protocol.cfg.MemoryCfg;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
public class MemoryForwarder extends Forwarder {
|
||||
|
||||
@@ -42,7 +42,7 @@ import static sanbing.jcpp.protocol.listener.tcp.configs.BinaryHandlerConfigurat
|
||||
import static sanbing.jcpp.protocol.listener.tcp.configs.TextHandlerConfiguration.SYSTEM_LINE_SEPARATOR;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@@ -131,7 +131,7 @@ public abstract class ChannelHandlerInitializer<C extends Channel> extends Chann
|
||||
binaryHandlerConfig.getTail());
|
||||
socketChannel.pipeline().addLast("JCPPHeadTailFrameDecoder", framer);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown binary decoder");
|
||||
throw new IllegalArgumentException("未知的二进制解码器类型");
|
||||
}
|
||||
|
||||
socketChannel.pipeline()
|
||||
|
||||
@@ -16,7 +16,7 @@ import sanbing.jcpp.protocol.ProtocolMessageProcessor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public abstract class Listener {
|
||||
|
||||
@@ -26,10 +26,10 @@ public abstract class Listener {
|
||||
@Getter
|
||||
private final ProtocolMessageProcessor protocolMessageProcessor;
|
||||
|
||||
protected AtomicInteger connectionsGauge = new AtomicInteger();
|
||||
protected MessagesStats uplinkMsgStats;
|
||||
protected MessagesStats downlinkMsgStats;
|
||||
protected Timer downlinkTimer;
|
||||
protected final AtomicInteger connectionsGauge = new AtomicInteger();
|
||||
protected final MessagesStats uplinkMsgStats;
|
||||
protected final MessagesStats downlinkMsgStats;
|
||||
protected final Timer downlinkTimer;
|
||||
|
||||
protected final ChannelHandlerParameter parameter;
|
||||
|
||||
|
||||
@@ -21,10 +21,10 @@ import sanbing.jcpp.infrastructure.util.exception.DownlinkException;
|
||||
import sanbing.jcpp.infrastructure.util.jackson.JacksonUtil;
|
||||
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.SessionCloseReason;
|
||||
import sanbing.jcpp.protocol.ProtocolMessageProcessor;
|
||||
import sanbing.jcpp.protocol.domain.ListenerToHandlerMsg;
|
||||
import sanbing.jcpp.protocol.domain.ProtocolUplinkMsg;
|
||||
import sanbing.jcpp.protocol.domain.SessionCloseReason;
|
||||
import sanbing.jcpp.protocol.domain.SessionToHandlerMsg;
|
||||
import sanbing.jcpp.protocol.listener.ChannelHandlerParameter;
|
||||
|
||||
@@ -130,7 +130,7 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
|
||||
|
||||
if (ctx.isRemoved()) {
|
||||
|
||||
tcpSession.close(SessionCloseReason.INACTIVE);
|
||||
tcpSession.close(SessionCloseReason.SESSION_CLOSE_ON_CHANNEL_INACTIVE);
|
||||
|
||||
log.warn("[{}]{}{} TCP会话已失效,因此删除会话", protocolName, ctx.channel(), tcpSession);
|
||||
|
||||
@@ -168,8 +168,8 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
|
||||
|
||||
private void logDownlinkStart(Supplier<String> logTransform) {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}]{} 开始发送下行报文:{}", protocolName, tcpSession, logTransform.get());
|
||||
if (log.isInfoEnabled()) {
|
||||
log.info("[{}]{} 开始发送下行报文:{}", protocolName, tcpSession, logTransform.get());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,8 +226,9 @@ public class TcpChannelHandler<T> extends SimpleChannelInboundHandler<ProtocolUp
|
||||
|
||||
super.channelInactive(ctx);
|
||||
|
||||
log.info("[{}]{}{} 通道不活跃", protocolName, ctx.channel(), tcpSession);
|
||||
// 处理连接断开逻辑
|
||||
tcpSession.close(SessionCloseReason.SESSION_CLOSE_ON_CHANNEL_INACTIVE);
|
||||
|
||||
log.info("[{}]{}{} 通道不活跃,连接已断开", protocolName, ctx.channel(), tcpSession);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -27,7 +27,7 @@ import sanbing.jcpp.protocol.listener.ChannelHandlerInitializer;
|
||||
import sanbing.jcpp.protocol.listener.Listener;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
public class TcpListener extends Listener {
|
||||
|
||||
@@ -11,19 +11,21 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.DownlinkRequestMessage;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.SessionCloseReason;
|
||||
import sanbing.jcpp.protocol.domain.ProtocolSession;
|
||||
import sanbing.jcpp.protocol.domain.SessionCloseReason;
|
||||
import sanbing.jcpp.protocol.listener.tcp.enums.SequenceNumberLength;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
||||
/**
|
||||
* 设备会话
|
||||
*
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Getter
|
||||
@@ -81,7 +83,7 @@ public class TcpSession extends ProtocolSession {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(SessionCloseReason reason) {
|
||||
public void close(ProtocolProto.SessionCloseReason reason) {
|
||||
super.close(reason);
|
||||
|
||||
ctx.flush();
|
||||
@@ -91,4 +93,6 @@ public class TcpSession extends ProtocolSession {
|
||||
public void writeAndFlush(ByteBuf byteBuf) {
|
||||
writeAndFlushConsumer.accept(byteBuf);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ import static sanbing.jcpp.protocol.listener.tcp.enums.ReadAct.CONTINUE;
|
||||
/**
|
||||
* 起始域结束域拆包
|
||||
*
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
public class JCPPHeadTailFrameDecoder extends ByteToMessageDecoder {
|
||||
|
||||
@@ -27,7 +27,7 @@ import static sanbing.jcpp.protocol.listener.tcp.enums.ReadAct.CONTINUE;
|
||||
/**
|
||||
* JCPP长度域拆包
|
||||
*
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
public class JCPPLengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
|
||||
|
||||
@@ -9,7 +9,7 @@ package sanbing.jcpp.protocol.listener.tcp.enums;
|
||||
/**
|
||||
* 读取动作,辅助枚举
|
||||
*
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public enum ReadAct {
|
||||
BREAK,
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
package sanbing.jcpp.protocol.listener.tcp.enums;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public enum SequenceNumberLength {
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
/**
|
||||
* 心跳检测
|
||||
*
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
|
||||
@@ -12,7 +12,7 @@ import sanbing.jcpp.infrastructure.util.mdc.MDCUtils;
|
||||
import sanbing.jcpp.infrastructure.util.trace.TracerContextUtil;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public class TracerHandler extends ChannelInboundHandlerAdapter {
|
||||
@Override
|
||||
|
||||
@@ -10,10 +10,10 @@ import sanbing.jcpp.protocol.domain.DownlinkCmdEnum;
|
||||
|
||||
/**
|
||||
* 下行命令转换器接口
|
||||
*
|
||||
* <p>
|
||||
* 每个协议模块都应该实现此接口,提供从通用下行命令到协议特定命令的转换
|
||||
*
|
||||
* @author sanbing
|
||||
* @author 九筒
|
||||
* @since 2024-12-16
|
||||
*/
|
||||
public interface DownlinkCmdConverter {
|
||||
|
||||
@@ -11,7 +11,7 @@ import sanbing.jcpp.protocol.domain.ProtocolSession;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public interface ProtocolSessionRegistryProvider {
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ package sanbing.jcpp.protocol.provider;
|
||||
import sanbing.jcpp.protocol.cfg.ProtocolCfg;
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
public interface ProtocolsConfigProvider {
|
||||
|
||||
|
||||
@@ -16,8 +16,8 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import sanbing.jcpp.infrastructure.util.async.JCPPThreadFactory;
|
||||
import sanbing.jcpp.infrastructure.util.config.ThreadPoolConfiguration;
|
||||
import sanbing.jcpp.proto.gen.ProtocolProto.SessionCloseReason;
|
||||
import sanbing.jcpp.protocol.domain.ProtocolSession;
|
||||
import sanbing.jcpp.protocol.domain.SessionCloseReason;
|
||||
import sanbing.jcpp.protocol.provider.ProtocolSessionRegistryProvider;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
/**
|
||||
* @author baigod
|
||||
* @author 九筒
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
@@ -51,7 +51,7 @@ public class DefaultProtocolSessionRegistryProvider implements ProtocolSessionRe
|
||||
public void init() {
|
||||
scheduledExecutorService.scheduleAtFixedRate(() -> sessionCache.asMap().forEach((id, session) -> {
|
||||
if (session.getLastActivityTime().isBefore(LocalDateTime.now().minusSeconds(defaultInactivityTimeoutInSec))) {
|
||||
session.close(SessionCloseReason.INACTIVE);
|
||||
session.close(SessionCloseReason.SESSION_CLOSE_DESTRUCTION);
|
||||
unregister(session.getId());
|
||||
}
|
||||
}), defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
||||
|
||||
@@ -17,9 +17,9 @@ import sanbing.jcpp.protocol.provider.ProtocolsConfigProvider;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Setter
|
||||
@Service
|
||||
@Slf4j
|
||||
@ConfigurationProperties("service")
|
||||
public class DefaultProtocolsConfigProvider implements ProtocolsConfigProvider {
|
||||
|
||||
|
||||
@@ -18,11 +18,11 @@ import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* 通用命令路由器
|
||||
*
|
||||
* <p>
|
||||
* 提供基于协议名+命令字的路由功能,支持多版本协议
|
||||
*
|
||||
* @param <T> 命令执行器类型
|
||||
* @author sanbing
|
||||
* @author 九筒
|
||||
* @since 2025-08-25
|
||||
*/
|
||||
@Slf4j
|
||||
@@ -79,7 +79,7 @@ public class ProtocolCommandRouter<T> {
|
||||
} catch (InstantiationException | IllegalAccessException |
|
||||
InvocationTargetException | NoSuchMethodException e) {
|
||||
log.error("无法实例化命令执行器 {}: {}", executorClass.getName(), e.getMessage());
|
||||
throw new RuntimeException("Failed to instantiate command executor: " + executorClass.getName(), e);
|
||||
throw new RuntimeException("实例化命令执行器失败: " + executorClass.getName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user