@@ -1,234 +1,24 @@
package com.jsowell.netty.server.electricbicycles ;
import com.google.common.collect.Lists ;
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCod e ;
import com.jsowell.common.enums.ykc.PileChannelEntity ;
import com.jsowell.common.util.BytesUtil ;
import com.jsowell.common.util.StringUtils ;
import com.jsowell.common.util.YKCUtils ;
import com.jsowell.netty.service.yunkuaichong.YKCBusinessService ;
import io.netty.buffer.ByteBuf ;
import com.alibaba.fastjson2.JSON ;
import com.jsowell.netty.service.electricbicycles.EBikeBusinessServic e ;
import com.jsowell.pile.domain.ebike.AbsEBikeMessage ;
import io.netty.channel.* ;
import io.netty.handler.timeout.IdleState ;
import io.netty.handler.timeout.IdleStateEvent ;
import io.netty.handler.timeout.ReadTimeoutException ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.collections4.CollectionUtils ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Component ;
import java.net.InetSocketAddress ;
import java.util.List ;
import java.util.Objects ;
import java.util.concurrent.ConcurrentHashMap ;
/**
* netty服务端处理类
*/
@ChannelHandler.Sharable
@Slf4j
@Component
public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter {
public class ElectricBicyclesServerHandler extends Simple ChannelInboundHandler< AbsEBikeMessage > {
@Autowired
private YKC BusinessService ykc Service;
@Autowired
private EBike BusinessService eBike Service;
/**
* 管理一个全局map, 保存连接进服务端的通道数量
*/
private static final ConcurrentHashMap < ChannelId , ChannelHandlerContext > CHANNEL_MAP = new ConcurrentHashMap < > ( ) ;
@Override
protected void channelRead0 ( ChannelHandlerContext ctx , AbsEBikeMessage msg ) throws Exception {
log . info ( " 收到消息, channelId:{}, msg:{} " , ctx . channel ( ) . id ( ) . toString ( ) , JSON . toJSONString ( msg ) ) ;
}
private final List < String > notPrintFrameTypeList = Lists . newArrayList ( " 0x03 " ) ;
/**
* 有客户端连接服务器会触发此函数
* 连接被建立并且准备进行通信时被调用
*/
@Override
public void channelActive ( ChannelHandlerContext ctx ) {
InetSocketAddress insocket = ( InetSocketAddress ) ctx . channel ( ) . remoteAddress ( ) ;
String clientIp = insocket . getAddress ( ) . getHostAddress ( ) ;
int clientPort = insocket . getPort ( ) ;
//获取连接通道唯一标识
ChannelId channelId = ctx . channel ( ) . id ( ) ;
//如果map中不包含此连接, 就保存连接
if ( CHANNEL_MAP . containsKey ( channelId ) ) {
log . info ( " Handler:{}, 客户端【{}】是连接状态,连接通道数量: {} " , this . getClass ( ) . getSimpleName ( ) , channelId , CHANNEL_MAP . size ( ) ) ;
} else {
//保存连接
CHANNEL_MAP . put ( channelId , ctx ) ;
log . info ( " Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {} " , this . getClass ( ) . getSimpleName ( ) , channelId , clientIp , clientPort , CHANNEL_MAP . size ( ) ) ;
}
}
/**
* 有客户端发消息会触发此函数
*/
@Override
public void channelRead ( ChannelHandlerContext ctx , Object message ) throws Exception {
// log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg);
// 下面可以解析数据, 保存数据, 生成返回报文, 将需要返回报文写入write函数
byte [ ] msg = ( byte [ ] ) message ;
// 获取帧类型
byte [ ] frameTypeBytes = BytesUtil . copyBytes ( msg , 5 , 1 ) ;
String frameType = YKCUtils . frameType2Str ( frameTypeBytes ) ;
// 获取序列号域
int serialNumber = BytesUtil . bytesToIntLittle ( BytesUtil . copyBytes ( msg , 2 , 2 ) ) ;
// 获取channel
Channel channel = ctx . channel ( ) ;
// new
// String hexString = DatatypeConverter.printHexBinary(msg);
// 心跳包0x03日志太多, 造成日志文件过大, 改为不打印
if ( ! CollectionUtils . containsAny ( notPrintFrameTypeList , frameType ) ) {
// log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}",
// channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber,
// BytesUtil.binary(msg, 16), hexString);
log . info ( " 【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{} " ,
channel . id ( ) , frameType , YKCFrameTypeCode . getFrameTypeStr ( frameType ) , serialNumber ,
BytesUtil . binary ( msg , 16 ) ) ;
}
// 处理数据
byte [ ] response = ykcService . process ( msg , ctx ) ;
if ( Objects . nonNull ( response ) ) {
// 响应客户端
ByteBuf buffer = ctx . alloc ( ) . buffer ( ) . writeBytes ( response ) ;
this . channelWrite ( channel . id ( ) , buffer ) ;
if ( ! CollectionUtils . containsAny ( notPrintFrameTypeList , frameType ) ) {
// 应答帧类型
byte [ ] responseFrameTypeBytes = YKCFrameTypeCode . PlatformAnswersRelation . getResponseFrameTypeBytes ( frameTypeBytes ) ;
String responseFrameType = YKCUtils . frameType2Str ( responseFrameTypeBytes ) ;
log . info ( " 【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{} " ,
channel . id ( ) , responseFrameType , YKCFrameTypeCode . getFrameTypeStr ( responseFrameType ) ,
frameType , YKCFrameTypeCode . getFrameTypeStr ( frameType ) , serialNumber ,
BytesUtil . binary ( response , 16 ) ) ;
}
}
}
/**
* 有客户端终止连接服务器会触发此函数
*/
@Override
public void channelInactive ( ChannelHandlerContext ctx ) {
InetSocketAddress insocket = ( InetSocketAddress ) ctx . channel ( ) . remoteAddress ( ) ;
String clientIp = insocket . getAddress ( ) . getHostAddress ( ) ;
ChannelId channelId = ctx . channel ( ) . id ( ) ;
//包含此客户端才去删除
if ( CHANNEL_MAP . containsKey ( channelId ) ) {
ykcService . exit ( channelId ) ;
//删除连接
CHANNEL_MAP . remove ( channelId ) ;
log . info ( " 客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {} " , channelId , clientIp , insocket . getPort ( ) , CHANNEL_MAP . size ( ) ) ;
}
}
@Override
public void handlerAdded ( ChannelHandlerContext ctx ) throws Exception { // (2)
// Channel incoming = ctx.channel();
// log.info("handlerAdded: handler被添加到channel的pipeline connect:" + incoming.remoteAddress());
}
@Override
public void handlerRemoved ( ChannelHandlerContext ctx ) throws Exception { // (3)
// Channel incoming = ctx.channel();
// log.info("handlerRemoved: handler从channel的pipeline中移除 connect:" + incoming.remoteAddress());
// ChannelMapByEntity.removeChannel(incoming);
// ChannelMap.removeChannel(incoming);
}
@Override
public void channelReadComplete ( ChannelHandlerContext ctx ) throws Exception {
Channel channel = ctx . channel ( ) ;
// log.info("channel:【{}】读数据完成", channel.id());
super . channelReadComplete ( ctx ) ;
}
/**
* 服务端给客户端发送消息
*
* @param channelId 连接通道唯一id
* @param msg 需要发送的消息内容
*/
public void channelWrite ( ChannelId channelId , Object msg ) throws Exception {
ChannelHandlerContext ctx = CHANNEL_MAP . get ( channelId ) ;
if ( ctx = = null ) {
log . info ( " 通道【{}】不存在 " , channelId ) ;
return ;
}
if ( msg = = null | | msg = = " " ) {
log . info ( " 服务端响应空的消息 " ) ;
return ;
}
//将客户端的信息直接返回写入ctx
ctx . write ( msg ) ;
//刷新缓存区
ctx . flush ( ) ;
}
@Override
public void userEventTriggered ( ChannelHandlerContext ctx , Object evt ) throws Exception {
String socketString = ctx . channel ( ) . remoteAddress ( ) . toString ( ) ;
ChannelId channelId = ctx . channel ( ) . id ( ) ;
String pileSn = PileChannelEntity . getPileSnByChannelId ( channelId . asLongText ( ) ) ;
if ( evt instanceof IdleStateEvent ) { // 超时事件
IdleStateEvent event = ( IdleStateEvent ) evt ;
boolean flag = false ;
if ( event . state ( ) = = IdleState . READER_IDLE ) { // 读
flag = true ;
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn);
} else if ( event . state ( ) = = IdleState . WRITER_IDLE ) { // 写
flag = true ;
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn);
} else if ( event . state ( ) = = IdleState . ALL_IDLE ) { // 全部
flag = true ;
// log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn);
}
if ( flag ) {
ctx . channel ( ) . close ( ) ;
// close(channelId, pileSn);
}
}
}
/**
* 发生异常会触发此函数
*/
@Override
public void exceptionCaught ( ChannelHandlerContext ctx , Throwable cause ) throws Exception {
ChannelId channelId = ctx . channel ( ) . id ( ) ;
String channelIdShortText = channelId . asShortText ( ) ;
String pileSn = PileChannelEntity . getPileSnByChannelId ( channelIdShortText ) ;
log . error ( " 发生异常 channelId:{}, pileSn:{} " , channelIdShortText , pileSn , cause ) ;
cause . printStackTrace ( ) ;
// 如果桩连到平台, 在1分钟内没有发送数据过来, 会报ReadTimeoutException异常
if ( cause instanceof ReadTimeoutException ) {
if ( log . isTraceEnabled ( ) ) {
log . trace ( " Connection timeout 【{}】 " , ctx . channel ( ) . remoteAddress ( ) ) ;
}
log . error ( " 【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {} " , channelId , pileSn , CHANNEL_MAP . size ( ) ) ;
ctx . channel ( ) . close ( ) ;
}
// close(channelId, pileSn);
}
// 公共方法 关闭连接
private void closeConnection ( String pileSn , ChannelHandlerContext ctx ) {
Channel channel = ctx . channel ( ) ;
ChannelId channelId = channel . id ( ) ;
log . error ( " close方法-发生异常, 关闭链接, channelId:{}, pileSn:{} " , channelId . asShortText ( ) , pileSn ) ;
if ( channel ! = null & & ! channel . isActive ( ) & & ! channel . isOpen ( ) & & ! channel . isWritable ( ) ) {
channel . close ( ) ;
// 删除连接
CHANNEL_MAP . remove ( channelId ) ;
}
// 删除桩编号和channel的关系
if ( StringUtils . isNotBlank ( pileSn ) ) {
PileChannelEntity . removeByPileSn ( pileSn ) ;
}
}
}