diff --git a/jsowell-admin/src/main/java/com/jsowell/service/CameraService.java b/jsowell-admin/src/main/java/com/jsowell/service/CameraService.java index 000f23bb2..a7eef8850 100644 --- a/jsowell-admin/src/main/java/com/jsowell/service/CameraService.java +++ b/jsowell-admin/src/main/java/com/jsowell/service/CameraService.java @@ -73,16 +73,16 @@ public class CameraService { public void receiveIdentifyResults(JSONObject jsonObject) throws InterruptedException { // 区分入场和出场 Integer parking_state = jsonObject.getJSONObject("parking").getInteger("parking_state"); - if (parking_state == 1) { + if (parking_state == Constants.one) { // 入场 String parkingState = "ENTRY"; String result = vehicleEntry(jsonObject, parkingState); logger.info("车辆入场处理 result:{}", result); } - if (parking_state == 2) { + if (parking_state == Constants.two) { // 在场 } - if (parking_state == 4) { + if (parking_state == Constants.four) { // 出场 vehicleLeave(jsonObject); } @@ -346,13 +346,13 @@ public class CameraService { // 判断降锁是否成功 // 将此降锁指令存入缓存, 在 nettyServer 收到新消息时判断是否是此消息的回复 - String redisKey = "plate_number_occupy_order:" + msgId; + String redisKey = CacheConstants.SEND_DROP_LOCK_COMMOND + msgId; redisCache.setCacheObject(redisKey, command, 5, TimeUnit.MINUTES); // 延时 3 秒钟, 再查询缓存中是否有此降锁信息的回复 Thread.sleep(3); - String responseRedisKey = "plate_number_occupy_order_response:" + msgId; + String responseRedisKey = CacheConstants.GROUND_LOCK_DEVICE_REPORT + msgId; Object cacheObject = redisCache.getCacheObject(responseRedisKey); return cacheObject; } diff --git a/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java b/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java index 37b3e4877..72185eb5e 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java +++ b/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java @@ -212,6 +212,16 @@ public class CacheConstants { */ public static final String CAMERA_HEARTBEAT = "CAMERA_HEARTBEAT_IP:"; + /** + * 发送降锁指令 + */ + public static final String SEND_DROP_LOCK_COMMOND = "CONTROLLED_LOCK_DROP_LOCK_COMMAND:"; + + /** + * 相机设备回复 + */ + public static final String GROUND_LOCK_DEVICE_REPORT = "GROUND_LOCK_DEVICE_REPORT:"; + /** * 相机设备 sn 和 channelId 进行绑定 */ diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttChannelInboundHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttChannelInboundHandler.java index 669dab9f6..212b06e20 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttChannelInboundHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttChannelInboundHandler.java @@ -46,6 +46,7 @@ public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAda @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException { if (null != msg) { + BootNettyMqttMsgBack msgBack = BeanUtils.getBean(BootNettyMqttMsgBack.class); MqttMessage mqttMessage = (MqttMessage) msg; log.info("MqttServer收到消息:{}", mqttMessage.toString()); MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); @@ -54,7 +55,7 @@ public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAda if (MqttMessageType.CONNECT.equals(mqttFixedHeader.messageType())) { // 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接 // to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里先直接返回一个CONNACK消息 - BootNettyMqttMsgBack.connack(channel, mqttMessage); + msgBack.connack(channel, mqttMessage); // 如果map中不包含此连接,就保存连接 String channelId = channel.id().asShortText(); System.out.println(channelId); @@ -79,30 +80,30 @@ public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAda case PUBLISH: // 客户端发布消息 // PUBACK报文是对QoS 1等级的PUBLISH报文的响应 System.out.println("客户端发布消息"); - BootNettyMqttMsgBack.puback(channel, mqttMessage); + msgBack.puback(channel, mqttMessage); break; case PUBREL: // 发布释放 // PUBREL报文是对PUBREC报文的响应 // to do - BootNettyMqttMsgBack.pubcomp(channel, mqttMessage); + msgBack.pubcomp(channel, mqttMessage); break; case SUBSCRIBE: // 客户端订阅主题 // 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。 // 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。 // SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端 // to do - BootNettyMqttMsgBack.suback(channel, mqttMessage); + msgBack.suback(channel, mqttMessage); break; case UNSUBSCRIBE: // 客户端取消订阅 // 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题 // to do - BootNettyMqttMsgBack.unsuback(channel, mqttMessage); + msgBack.unsuback(channel, mqttMessage); break; case PINGREQ: // 客户端发起心跳 // 客户端发送PINGREQ报文给服务端的 // 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着 // 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开 - BootNettyMqttMsgBack.pingresp(channel, mqttMessage); + msgBack.pingresp(channel, mqttMessage); break; case DISCONNECT: // 客户端主动断开连接 // DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0 diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttMsgBack.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttMsgBack.java index 2a9d0e028..e1166f441 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttMsgBack.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttMsgBack.java @@ -5,8 +5,11 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.alibaba.fastjson2.JSONObject; +import com.jsowell.common.constant.CacheConstants; import com.jsowell.common.core.redis.RedisCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +44,16 @@ public class BootNettyMqttMsgBack { private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class); + @Autowired + private RedisCache redisCache; + /** * 确认连接请求 * * @param channel * @param mqttMessage */ - public static void connack(Channel channel, MqttMessage mqttMessage) { + public void connack(Channel channel, MqttMessage mqttMessage) { log.info("服务器端收到确认连接请求:{}", mqttMessage.toString()); MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader(); @@ -71,7 +77,7 @@ public class BootNettyMqttMsgBack { * @param channel * @param mqttMessage */ - public static void puback(Channel channel, MqttMessage mqttMessage) { + public void puback(Channel channel, MqttMessage mqttMessage) { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader(); MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel(); @@ -80,6 +86,9 @@ public class BootNettyMqttMsgBack { String data = new String(headBytes); System.out.println("publish data--" + data); + // 处理发布报文消息 + processMsg(data); + switch (qos) { case AT_MOST_ONCE: // 至多一次 break; @@ -107,13 +116,35 @@ public class BootNettyMqttMsgBack { } } + /** + * 处理发布报文消息 + * @param msgData + * @return + */ + private void processMsg(String msgData) { + // 将信息转化成 Json 字符串 + JSONObject jsonObject = JSONObject.parseObject(msgData); + // 由于相机在连接成功时会发送一条设备信息数据,此数据中无 msg_id 字段, 因此需要加 try catch + String msgId = ""; + try { + msgId = jsonObject.getString("msg_id"); + }catch (Exception e) { + // 此时为设备信息报文,打印日志后跳过 + log.info("收到相机报文解析错误 ,源报文;{}", msgData); + return; + } + // 将信息存入缓存 + String redisKey = CacheConstants.GROUND_LOCK_DEVICE_REPORT + msgId; + redisCache.setCacheObject(redisKey, msgData, 5, TimeUnit.MINUTES); + } + /** * 发布完成 qos2 * * @param channel * @param mqttMessage */ - public static void pubcomp(Channel channel, MqttMessage mqttMessage) { + public void pubcomp(Channel channel, MqttMessage mqttMessage) { MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); // 构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0x02); @@ -130,7 +161,7 @@ public class BootNettyMqttMsgBack { * @param channel * @param mqttMessage */ - public static void suback(Channel channel, MqttMessage mqttMessage) { + public void suback(Channel channel, MqttMessage mqttMessage) { log.info("接收到订阅确认信息:{}", mqttMessage.toString()); MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage; MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); @@ -158,7 +189,7 @@ public class BootNettyMqttMsgBack { * @param channel * @param mqttMessage */ - public static void unsuback(Channel channel, MqttMessage mqttMessage) { + public void unsuback(Channel channel, MqttMessage mqttMessage) { MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); // 构建返回报文 可变报头 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); @@ -176,7 +207,7 @@ public class BootNettyMqttMsgBack { * @param channel * @param mqttMessage */ - public static void pingresp(Channel channel, MqttMessage mqttMessage) { + public void pingresp(Channel channel, MqttMessage mqttMessage) { // 心跳响应报文 11010000 00000000 固定报文 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);