新增 相机主动发送订阅消息处理方法

This commit is contained in:
Lemon
2024-02-28 15:28:53 +08:00
parent a7f31a23d2
commit b60642d466
4 changed files with 59 additions and 17 deletions

View File

@@ -46,6 +46,7 @@ public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAda
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
if (null != msg) {
BootNettyMqttMsgBack msgBack = BeanUtils.getBean(BootNettyMqttMsgBack.class);
MqttMessage mqttMessage = (MqttMessage) msg;
log.info("MqttServer收到消息:{}", mqttMessage.toString());
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
@@ -54,7 +55,7 @@ public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAda
if (MqttMessageType.CONNECT.equals(mqttFixedHeader.messageType())) {
// 在一个网络连接上客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
// to do 建议connect消息单独处理用来对客户端进行认证管理等 这里先直接返回一个CONNACK消息
BootNettyMqttMsgBack.connack(channel, mqttMessage);
msgBack.connack(channel, mqttMessage);
// 如果map中不包含此连接就保存连接
String channelId = channel.id().asShortText();
System.out.println(channelId);
@@ -79,30 +80,30 @@ public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAda
case PUBLISH: // 客户端发布消息
// PUBACK报文是对QoS 1等级的PUBLISH报文的响应
System.out.println("客户端发布消息");
BootNettyMqttMsgBack.puback(channel, mqttMessage);
msgBack.puback(channel, mqttMessage);
break;
case PUBREL: // 发布释放
// PUBREL报文是对PUBREC报文的响应
// to do
BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
msgBack.pubcomp(channel, mqttMessage);
break;
case SUBSCRIBE: // 客户端订阅主题
// 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅每个订阅注册客户端关心的一个或多个主题。
// 为了将应用消息转发给与那些订阅匹配的主题服务端发送PUBLISH报文给客户端。
// SUBSCRIBE报文也为每个订阅指定了最大的QoS等级服务端根据这个发送应用消息给客户端
// to do
BootNettyMqttMsgBack.suback(channel, mqttMessage);
msgBack.suback(channel, mqttMessage);
break;
case UNSUBSCRIBE: // 客户端取消订阅
// 客户端发送UNSUBSCRIBE报文给服务端用于取消订阅主题
// to do
BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
msgBack.unsuback(channel, mqttMessage);
break;
case PINGREQ: // 客户端发起心跳
// 客户端发送PINGREQ报文给服务端的
// 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
// 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
msgBack.pingresp(channel, mqttMessage);
break;
case DISCONNECT: // 客户端主动断开连接
// DISCONNECT报文是客户端发给服务端的最后一个控制报文 服务端必须验证所有的保留位都被设置为0

View File

@@ -5,8 +5,11 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.alibaba.fastjson2.JSONObject;
import com.jsowell.common.constant.CacheConstants;
import com.jsowell.common.core.redis.RedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,13 +44,16 @@ public class BootNettyMqttMsgBack {
private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
@Autowired
private RedisCache redisCache;
/**
* 确认连接请求
*
* @param channel
* @param mqttMessage
*/
public static void connack(Channel channel, MqttMessage mqttMessage) {
public void connack(Channel channel, MqttMessage mqttMessage) {
log.info("服务器端收到确认连接请求:{}", mqttMessage.toString());
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
@@ -71,7 +77,7 @@ public class BootNettyMqttMsgBack {
* @param channel
* @param mqttMessage
*/
public static void puback(Channel channel, MqttMessage mqttMessage) {
public void puback(Channel channel, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
@@ -80,6 +86,9 @@ public class BootNettyMqttMsgBack {
String data = new String(headBytes);
System.out.println("publish data--" + data);
// 处理发布报文消息
processMsg(data);
switch (qos) {
case AT_MOST_ONCE: // 至多一次
break;
@@ -107,13 +116,35 @@ public class BootNettyMqttMsgBack {
}
}
/**
* 处理发布报文消息
* @param msgData
* @return
*/
private void processMsg(String msgData) {
// 将信息转化成 Json 字符串
JSONObject jsonObject = JSONObject.parseObject(msgData);
// 由于相机在连接成功时会发送一条设备信息数据,此数据中无 msg_id 字段, 因此需要加 try catch
String msgId = "";
try {
msgId = jsonObject.getString("msg_id");
}catch (Exception e) {
// 此时为设备信息报文,打印日志后跳过
log.info("收到相机报文解析错误 ,源报文;{}", msgData);
return;
}
// 将信息存入缓存
String redisKey = CacheConstants.GROUND_LOCK_DEVICE_REPORT + msgId;
redisCache.setCacheObject(redisKey, msgData, 5, TimeUnit.MINUTES);
}
/**
* 发布完成 qos2
*
* @param channel
* @param mqttMessage
*/
public static void pubcomp(Channel channel, MqttMessage mqttMessage) {
public void pubcomp(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
@@ -130,7 +161,7 @@ public class BootNettyMqttMsgBack {
* @param channel
* @param mqttMessage
*/
public static void suback(Channel channel, MqttMessage mqttMessage) {
public void suback(Channel channel, MqttMessage mqttMessage) {
log.info("接收到订阅确认信息:{}", mqttMessage.toString());
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
@@ -158,7 +189,7 @@ public class BootNettyMqttMsgBack {
* @param channel
* @param mqttMessage
*/
public static void unsuback(Channel channel, MqttMessage mqttMessage) {
public void unsuback(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
// 构建返回报文 可变报头
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
@@ -176,7 +207,7 @@ public class BootNettyMqttMsgBack {
* @param channel
* @param mqttMessage
*/
public static void pingresp(Channel channel, MqttMessage mqttMessage) {
public void pingresp(Channel channel, MqttMessage mqttMessage) {
// 心跳响应报文 11010000 00000000 固定报文
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);