update mqtt 服务器

This commit is contained in:
Lemon
2023-12-21 10:37:21 +08:00
parent 5fbce62752
commit 36c2832f63
6 changed files with 43 additions and 46 deletions

View File

@@ -12,9 +12,22 @@ import java.net.UnknownHostException;
* @Date 2023/12/20 15:15:26
*/
public interface CameraBusinessService {
public void sendGroundLockCommand(String sn, String msgType, String msgPrefix, String topic) throws InterruptedException;
/**
* 发送具体指令到某主题
* @param sn 设备 sn
* @param msgType 消息类型
* @param msgPrefix 消息前缀
* @param topic 主题
* @param msgData 消息内容
* @throws InterruptedException
*/
public void sendGroundLockCommand(String sn, String msgType, String msgPrefix, String topic, String msgData) throws InterruptedException;
/**
* 解析channel中的ip地址 并将 sn 和 channelId 进行绑定,存入缓存
* @param channel
* @throws UnknownHostException
*/
public void processConnectMsg(Channel channel) throws UnknownHostException;
}

View File

@@ -4,14 +4,11 @@ import com.alibaba.fastjson2.JSONObject;
import com.jsowell.common.constant.CacheConstants;
import com.jsowell.common.core.redis.RedisCache;
import com.jsowell.common.util.DateUtils;
import com.jsowell.common.util.StringUtils;
import com.jsowell.common.util.sign.MD5Util;
import com.jsowell.netty.domain.MqttRequest;
import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler;
import com.jsowell.netty.service.camera.CameraBusinessService;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -34,20 +31,37 @@ public class CameraBusinessServiceImpl implements CameraBusinessService {
@Autowired
private BootNettyMqttChannelInboundHandler handler;
public void sendGroundLockCommand(String sn, String msgType, String msgPrefix, String topic) throws InterruptedException {
/**
* 发送具体指令到某主题
* @param sn 设备 sn
* @param msgType 消息类型
* @param msgPrefix 消息前缀
* @param topic 主题
* @param msgData 消息内容
* @throws InterruptedException
*/
public void sendGroundLockCommand(String sn, String msgType, String msgPrefix, String topic, String msgData) throws InterruptedException {
JSONObject jsonObject = spliceStr(sn, msgType, msgPrefix);
// 通过sn查找出对应的channelId
String mqttConnectRedisKey = CacheConstants.MQTT_CONNECT_CHANNEL + sn;
String mqttConnectRedisKey = CacheConstants.MQTT_CONNECT_SN + sn;
Object cacheObject = redisCache.getCacheObject(mqttConnectRedisKey);
if (cacheObject == null) {
return;
}
String channelId = (String) cacheObject;
if (StringUtils.isNotBlank(msgData)) {
jsonObject.put("msg_data", msgData);
}
// 发送消息
handler.sendMsg(channelId, topic, jsonObject.toJSONString());
}
/**
* 解析channel中的ip地址 并将 sn 和 channelId 进行绑定,存入缓存
* @param channel
* @throws UnknownHostException
*/
public void processConnectMsg(Channel channel) throws UnknownHostException {
// 解析 channel 中的 ip 地址
String remoteAddress = channel.remoteAddress().toString();
@@ -68,7 +82,7 @@ public class CameraBusinessServiceImpl implements CameraBusinessService {
if (cacheObject != null) {
String sn = (String) cacheObject;
// 将 sn 和 channelId 绑定关系存入缓存
String mqttConnectRedisKey = CacheConstants.MQTT_CONNECT_CHANNEL + sn;
String mqttConnectRedisKey = CacheConstants.MQTT_CONNECT_SN + sn;
redisCache.setCacheObject(mqttConnectRedisKey, channel.id().asShortText());
}
}