update 电单车协议

This commit is contained in:
Guoqs
2024-08-27 16:30:05 +08:00
parent 1e176bcc61
commit 57ade1380d
3 changed files with 87 additions and 7 deletions

View File

@@ -2,17 +2,18 @@ package com.jsowell.netty.server.electricbicycles;
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Lists;
import com.jsowell.common.enums.ykc.PileChannelEntity;
import com.jsowell.common.util.BytesUtil;
import com.jsowell.common.util.ChannelManagerUtil;
import com.jsowell.netty.service.electricbicycles.EBikeBusinessService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.*;
import io.netty.handler.timeout.ReadTimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -23,7 +24,7 @@ import java.util.concurrent.TimeUnit;
@ChannelHandler.Sharable
@Slf4j
@Component
public class ElectricBicyclesServerHandler extends SimpleChannelInboundHandler<Object> {
public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter {
private final Map<String, ResponseFuture> responseFutureMap = new ConcurrentHashMap<>();
@@ -32,8 +33,33 @@ public class ElectricBicyclesServerHandler extends SimpleChannelInboundHandler<O
@Autowired
private EBikeBusinessService eBikeService;
/**
* 有客户端连接服务器会触发此函数
* 连接被建立并且准备进行通信时被调用
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
public void channelActive(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
int clientPort = insocket.getPort();
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
//如果map中不包含此连接就保存连接
ChannelManagerUtil.addChannel(channelId.asShortText(), ctx);
// if (CHANNEL_MAP.containsKey(channelId)) {
// log.info("客户端【{}】是连接状态,连接通道数量: {}", channelId, CHANNEL_MAP.size());
// } else {
// //保存连接
// CHANNEL_MAP.put(channelId, ctx);
// log.info("客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, clientPort, CHANNEL_MAP.size());
// }
}
/**
* 有客户端发消息会触发此函数
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
Channel channel = ctx.channel();
log.info("收到消息, channelId:{}, msg:{}", channel.id().toString(), JSON.toJSONString(message));
byte[] msg = (byte[]) message;
@@ -58,6 +84,57 @@ public class ElectricBicyclesServerHandler extends SimpleChannelInboundHandler<O
}
}
/**
* 有客户端终止连接服务器会触发此函数
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
ChannelId channelId = ctx.channel().id();
//包含此客户端才去删除
ChannelManagerUtil.removeChannel(channelId.asShortText());
// if (CHANNEL_MAP.containsKey(channelId)) {
// ykcService.exit(channelId);
// //删除连接
// CHANNEL_MAP.remove(channelId);
// log.info("客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size());
// }
}
/**
* 读取成功
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Channel channel = ctx.channel();
// log.info("channel:【{}】读数据完成", channel.id());
super.channelReadComplete(ctx);
}
/**
* 发生异常会触发此函数
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ChannelId channelId = ctx.channel().id();
String channelIdShortText = channelId.asShortText();
String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText);
log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause);
cause.printStackTrace();
// 如果桩连到平台在1分钟内没有发送数据过来会报ReadTimeoutException异常
if (cause instanceof ReadTimeoutException) {
if (log.isTraceEnabled()) {
log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress());
}
log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, ChannelManagerUtil.getChannelCount());
ctx.channel().close();
}
// close(channelId, pileSn);
}
public String sendCommandAndWaitForResponse(Channel channel, String command, long timeout) throws InterruptedException {
String requestId = generateRequestId();
ResponseFuture future = new ResponseFuture();