From 5fbce62752a209bae2c06ac93464188c3a4bc01e Mon Sep 17 00:00:00 2001 From: Lemon Date: Wed, 20 Dec 2023 16:17:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20netty=E6=95=B4=E5=90=88mqt?= =?UTF-8?q?t=E5=8D=8F=E8=AE=AE=EF=BC=8C=E4=B8=8E=E8=BD=A6=E4=BD=8D?= =?UTF-8?q?=E7=9B=B8=E6=9C=BA=E9=80=9A=E8=AE=AF=E5=B9=B6=E4=BF=9D=E5=AD=98?= =?UTF-8?q?=E9=80=9A=E8=AE=AF=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../thirdparty/camera/CameraController.java | 30 +- .../test/java/SpringBootTestController.java | 22 ++ .../common/constant/CacheConstants.java | 10 + .../jsowell/common/util/bean/BeanUtils.java | 25 +- jsowell-netty/pom.xml | 5 + .../netty/domain/MqttConnectPayload.java | 24 ++ .../com/jsowell/netty/domain/MqttRequest.java | 97 +++++++ .../BootNettyMqttChannelInboundHandler.java | 268 +++++++++++++++++- .../server/mqtt/BootNettyMqttMsgBack.java | 178 ++++++++++++ .../jsowell/netty/server/mqtt/MqttSever.java | 78 +++++ .../server/yunkuaichong/NettyServer.java | 39 ++- .../service/camera/CameraBusinessService.java | 20 ++ .../impl/CameraBusinessServiceImpl.java | 109 +++++++ .../pile/dto/camera/SendMsg2TopicDTO.java | 32 +++ .../camera/service/CameraService.java | 50 +++- pom.xml | 8 + 16 files changed, 972 insertions(+), 23 deletions(-) create mode 100644 jsowell-netty/src/main/java/com/jsowell/netty/domain/MqttConnectPayload.java create mode 100644 jsowell-netty/src/main/java/com/jsowell/netty/domain/MqttRequest.java create mode 100644 jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java create mode 100644 jsowell-netty/src/main/java/com/jsowell/netty/service/camera/CameraBusinessService.java create mode 100644 jsowell-netty/src/main/java/com/jsowell/netty/service/camera/impl/CameraBusinessServiceImpl.java create mode 100644 jsowell-pile/src/main/java/com/jsowell/pile/dto/camera/SendMsg2TopicDTO.java diff --git a/jsowell-admin/src/main/java/com/jsowell/web/controller/thirdparty/camera/CameraController.java b/jsowell-admin/src/main/java/com/jsowell/web/controller/thirdparty/camera/CameraController.java index 58dd9198d..5524207c6 100644 --- a/jsowell-admin/src/main/java/com/jsowell/web/controller/thirdparty/camera/CameraController.java +++ b/jsowell-admin/src/main/java/com/jsowell/web/controller/thirdparty/camera/CameraController.java @@ -4,18 +4,19 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.jsowell.common.annotation.Anonymous; import com.jsowell.common.core.controller.BaseController; +import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler; +import com.jsowell.netty.service.camera.CameraBusinessService; import com.jsowell.pile.dto.camera.CameraHeartBeatDTO; -import com.jsowell.pile.dto.camera.CameraIdentifyResultsDTO; +import com.jsowell.pile.dto.camera.SendMsg2TopicDTO; import com.jsowell.thirdparty.camera.common.CameraCommonResult; import com.jsowell.thirdparty.camera.service.CameraService; +import io.netty.channel.ChannelFuture; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.List; - /** * 充电相机 controller * @@ -29,6 +30,12 @@ public class CameraController extends BaseController { @Autowired private CameraService cameraService; + @Autowired + private CameraBusinessService cameraBusinessService; + + @Autowired + private BootNettyMqttChannelInboundHandler handler; + /** * 心跳 * @param dto @@ -36,6 +43,7 @@ public class CameraController extends BaseController { @PostMapping("/v1/receiveHeartBeat") public CameraCommonResult receiveHeartBeat(@RequestBody CameraHeartBeatDTO dto) { logger.info("接收相机系统心跳包 params:{}", JSON.toJSONString(dto)); + cameraService.saveHeartBeat2Redis(dto); CameraCommonResult result = new CameraCommonResult(); logger.info("接收相机系统心跳包 result:{}", JSON.toJSONString(result.successResponse())); return result.successResponse(); @@ -53,4 +61,20 @@ public class CameraController extends BaseController { cameraService.receiveIdentifyResults(jsonObject); return result.successResponse(); } + + @PostMapping("/sendMsg2Topic") + public void sendMsg2Topic(@RequestBody SendMsg2TopicDTO dto) { + try { + cameraBusinessService.sendGroundLockCommand(dto.getSn(), dto.getMsgType(), dto.getMsgPrefix(), dto.getTopic()); + } catch (Exception e) { + logger.error("发送消息 error, ", e); + } + } + + + // @PostMapping("/sendMsg2Topic") + // public void sendMsg2Topic(@RequestBody SendMsg2TopicDTO dto) throws InterruptedException { + // ChannelFuture channelFuture = handler.sendMsg(dto.getChannelId(), dto.getTopic(), dto.getRequestMsg().toString()); + // System.out.println("123"); + // } } diff --git a/jsowell-admin/src/test/java/SpringBootTestController.java b/jsowell-admin/src/test/java/SpringBootTestController.java index fec4527c7..ee4efa65a 100644 --- a/jsowell-admin/src/test/java/SpringBootTestController.java +++ b/jsowell-admin/src/test/java/SpringBootTestController.java @@ -47,6 +47,7 @@ import com.jsowell.common.util.id.SnowflakeIdWorker; import com.jsowell.common.util.ip.AddressUtils; import com.jsowell.netty.handler.HeartbeatRequestHandler; import com.jsowell.netty.handler.TransactionRecordsRequestHandler; +import com.jsowell.netty.service.camera.impl.CameraBusinessServiceImpl; import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; import com.jsowell.pile.domain.*; import com.jsowell.pile.domain.ykcCommond.IssueQRCodeCommand; @@ -85,6 +86,7 @@ import com.jsowell.wxpay.common.WeChatPayParameter; import com.jsowell.wxpay.dto.AppletTemplateMessageSendDTO; import com.jsowell.wxpay.response.WechatPayRefundRequest; import com.jsowell.wxpay.service.WxAppletRemoteService; +import io.netty.channel.ChannelFuture; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Test; @@ -220,6 +222,9 @@ public class SpringBootTestController { @Autowired private OrderPileOccupyService orderPileOccupyService; + @Autowired + private CameraBusinessServiceImpl cameraBusinessServiceImpl; + @Autowired private LTYTService ltytService; @@ -248,6 +253,23 @@ public class SpringBootTestController { System.out.println(JSON.toJSONString(memberWalletVOS)); } + @Test + public void testMqttSendMsg() throws InterruptedException { + String channelId = "94dd42b6"; + String topic = "/GroundlockStatus"; + JSONObject jsonObject = new JSONObject(); + jsonObject.put("sign","F4213AD90EBC72C678E03450E4E091EE"); + jsonObject.put("sn","e27f089d-5fadf6c6"); + jsonObject.put("timestamp","2021-07-01 12:00:01"); + jsonObject.put("msg_id","GS2021070112000101"); + jsonObject.put("msg_type","GroundlockStatus"); + jsonObject.put("msg_data",null); + + // ChannelFuture future = cameraBusinessServiceImpl.sendMsg(channelId, topic, jsonObject.toJSONString()); + // System.out.println(future.toString()); + } + + @Test public void queryPaymentByOrderNoTest() { String orderNo = "C44903356969"; diff --git a/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java b/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java index 401fa75f5..6862d66e1 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java +++ b/jsowell-common/src/main/java/com/jsowell/common/constant/CacheConstants.java @@ -199,6 +199,16 @@ public class CacheConstants { */ public static final String CAMERA_IMAGE_BY_PLATE_NUMBER = "CAMERA_IMAGE_BY_PLATE_NUMBER_"; + /** + * 相机心跳数据 + */ + public static final String CAMERA_HEARTBEAT = "CAMERA_HEARTBEAT_"; + + /** + * 相机设备 sn 和 channelId 进行绑定 + */ + public static final String MQTT_CONNECT_CHANNEL = "MQTT_CONNECT_CHANNEL_"; + /** * 桩硬件故障 */ diff --git a/jsowell-common/src/main/java/com/jsowell/common/util/bean/BeanUtils.java b/jsowell-common/src/main/java/com/jsowell/common/util/bean/BeanUtils.java index 8a45b2f7c..affff88f6 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/util/bean/BeanUtils.java +++ b/jsowell-common/src/main/java/com/jsowell/common/util/bean/BeanUtils.java @@ -1,5 +1,10 @@ package com.jsowell.common.util.bean; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; @@ -11,7 +16,11 @@ import java.util.regex.Pattern; * * @author jsowell */ -public class BeanUtils extends org.springframework.beans.BeanUtils { +@Component +public class BeanUtils extends org.springframework.beans.BeanUtils implements ApplicationContextAware { + + protected static ApplicationContext applicationContext; + /** * Bean方法名中属性名开始的下标 */ @@ -101,4 +110,18 @@ public class BeanUtils extends org.springframework.beans.BeanUtils { public static boolean isMethodPropEquals(String m1, String m2) { return m1.substring(BEAN_METHOD_PROP_INDEX).equals(m2.substring(BEAN_METHOD_PROP_INDEX)); } + + @Override + public void setApplicationContext(ApplicationContext app) throws BeansException { + if (applicationContext == null) { + applicationContext = app; + } + } + + /** + * 通过类的class从容器中手动获取对象 + */ + public static T getBean(Class clazz) { + return applicationContext.getBean(clazz); + } } diff --git a/jsowell-netty/pom.xml b/jsowell-netty/pom.xml index 30096c99e..90f2d1d68 100644 --- a/jsowell-netty/pom.xml +++ b/jsowell-netty/pom.xml @@ -25,6 +25,11 @@ com.jsowell jsowell-thirdparty + + io.netty + netty-codec-mqtt + 4.1.79.Final + diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/MqttConnectPayload.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/MqttConnectPayload.java new file mode 100644 index 000000000..e66c334f5 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/MqttConnectPayload.java @@ -0,0 +1,24 @@ +package com.jsowell.netty.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * TODO + * + * @author Lemon + * @Date 2023/12/19 9:39:48 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class MqttConnectPayload { + private String clientIdentifier; + private String willTopic; + private String willMessage; + private String userName; + private String password; +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/MqttRequest.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/MqttRequest.java new file mode 100644 index 000000000..fdbb2006f --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/MqttRequest.java @@ -0,0 +1,97 @@ +package com.jsowell.netty.domain; + +import io.netty.handler.codec.mqtt.MqttQoS; + +/** + * TODO + * + * @author Lemon + * @Date 2023/12/19 8:27:30 + */ +public class MqttRequest { + private boolean mutable = true; + private byte[] payload; + private MqttQoS qos = MqttQoS.AT_MOST_ONCE; + private boolean retained = false; + private boolean dup = false; + private int messageId; + + public MqttRequest() { + this.setPayload(new byte[0]); + } + + public MqttRequest(byte[] payload) { + this.setPayload(payload); + } + + public byte[] getPayload() { + return this.payload; + } + + public void clearPayload() { + this.checkMutable(); + this.payload = new byte[0]; + } + + public void setPayload(byte[] payload) { + this.checkMutable(); + if (payload == null) { + throw new NullPointerException(); + } else { + this.payload = payload; + } + } + + public boolean isRetained() { + return this.retained; + } + + public void setRetained(boolean retained) { + this.checkMutable(); + this.retained = retained; + } + + public MqttQoS getQos() { + return qos; + } + + public void setQos(MqttQoS qos) { + this.qos = qos; + } + + public boolean isMutable() { + return mutable; + } + + public void setMutable(boolean mutable) { + this.mutable = mutable; + } + + protected void checkMutable() throws IllegalStateException { + if (!this.mutable) { + throw new IllegalStateException(); + } + } + + public boolean isDup() { + return dup; + } + + public void setDup(boolean dup) { + this.dup = dup; + } + + public int getMessageId() { + return messageId; + } + + public void setMessageId(int messageId) { + this.messageId = messageId; + } + + @Override + public String toString() { + return new String(this.payload); + } + +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttChannelInboundHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttChannelInboundHandler.java index 6c56d75be..606e397a5 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttChannelInboundHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttChannelInboundHandler.java @@ -1,10 +1,274 @@ package com.jsowell.netty.server.mqtt; + +import com.alibaba.fastjson2.JSONObject; +import com.jsowell.common.util.StringUtils; +import com.jsowell.common.util.bean.BeanUtils; +import com.jsowell.netty.domain.MqttRequest; +import com.jsowell.netty.service.camera.CameraBusinessService; +import com.jsowell.netty.service.camera.impl.CameraBusinessServiceImpl; +import com.jsowell.thirdparty.camera.service.CameraService; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.handler.codec.mqtt.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + /** - * TODO + * MQTT服务端I/O数据读写处理类 * * @author Lemon * @Date 2023/12/14 11:33:03 */ -public class BootNettyMqttChannelInboundHandler { + +@ChannelHandler.Sharable +@Slf4j +@Component +public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAdapter { + + /** + * Key: channelId + * Value: ctx + */ + private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + + /** + * Key: channelId + * Value: 客户端id + */ + private static final ConcurrentHashMap CLIENT_MAP = new ConcurrentHashMap<>(); + + /** + * 从客户端收到新的数据时,这个方法会在收到消息时被调用 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException { + if (null != msg) { + MqttMessage mqttMessage = (MqttMessage) msg; + log.info("MqttServer收到消息:{}", mqttMessage.toString()); + MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); + Channel channel = ctx.channel(); + + if (mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)) { + // 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接 + // to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里先直接返回一个CONNACK消息 + BootNettyMqttMsgBack.connack(channel, mqttMessage); + // 如果map中不包含此连接,就保存连接 + String channelId = channel.id().asShortText(); + System.out.println(channelId); + if (CHANNEL_MAP.get(channelId) != null) { + log.info("客户端【{}】是连接状态,连接通道数量: {}", channelId, CHANNEL_MAP.size()); + } else { + //保存连接 + CHANNEL_MAP.put(channelId, ctx); + // System.out.println(channel.remoteAddress()); // /192.168.2.100:39396 + log.info("客户端【{}】, 连接Mqtt服务器, 连接通道数量: {}", channelId, CHANNEL_MAP.size()); + CameraBusinessService cameraBusinessService = BeanUtils.getBean(CameraBusinessService.class); + cameraBusinessService.processConnectMsg(channel); + } + // 保存客户端id + String clientIdentifier = parsePayLoadInfo(mqttMessage.payload().toString()); + if (StringUtils.isNotBlank(clientIdentifier)) { + CLIENT_MAP.put(channelId, clientIdentifier); + } + } + + switch (mqttFixedHeader.messageType()) { + case PUBLISH: // 客户端发布消息 + // PUBACK报文是对QoS 1等级的PUBLISH报文的响应 + System.out.println("客户端发布消息"); + BootNettyMqttMsgBack.puback(channel, mqttMessage); + break; + case PUBREL: // 发布释放 + // PUBREL报文是对PUBREC报文的响应 + // to do + BootNettyMqttMsgBack.pubcomp(channel, mqttMessage); + break; + case SUBSCRIBE: // 客户端订阅主题 + // 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。 + // 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。 + // SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端 + // to do + BootNettyMqttMsgBack.suback(channel, mqttMessage); + break; + case UNSUBSCRIBE: // 客户端取消订阅 + // 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题 + // to do + BootNettyMqttMsgBack.unsuback(channel, mqttMessage); + break; + case PINGREQ: // 客户端发起心跳 + // 客户端发送PINGREQ报文给服务端的 + // 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着 + // 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开 + BootNettyMqttMsgBack.pingresp(channel, mqttMessage); + break; + case DISCONNECT: // 客户端主动断开连接 + // DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0 + // to do + break; + default: + break; + } + } + } + + /** + * 从客户端收到新的数据、读取完成时调用 + */ + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { + } + + /** + * 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行 + */ + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + super.channelRegistered(ctx); + } + + /** + * 客户端与服务端 断连时执行 channelInactive方法之后执行 + */ + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + super.channelUnregistered(ctx); + } + + /** + * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + ctx.close(); + } + + /** + * 客户端与服务端第一次建立连接时执行 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + } + + /** + * 客户端与服务端 断连时执行 + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException { + // 删除连接 + removeConnect(ctx); + super.channelInactive(ctx); + } + + /** + * 服务端 当读超时时 会调用这个方法 + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException { + // 删除连接 + removeConnect(ctx); + super.userEventTriggered(ctx, evt); + } + + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + super.channelWritabilityChanged(ctx); + } + + + + /** + * 删除连接 + * @param ctx + */ + public void removeConnect(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + String channelId = ctx.channel().id().asShortText(); + //包含此客户端才去删除 + if (CHANNEL_MAP.get(channelId) != null) { + //删除连接 + CHANNEL_MAP.remove(channelId); + CLIENT_MAP.remove(channelId); + log.info("客户端【{}】, 退出Mqtt服务器[IP:{}--->PORT:{}], 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size()); + } + } + + /** + * 解析 payload 字段,并将 clientIdentifier (客户端id)返回 + * @param payloadText + * @return 客户端id + */ + private String parsePayLoadInfo(String payloadText) { + if (StringUtils.isBlank(payloadText)) { + return null; + } + JSONObject payload = new JSONObject(); + // 使用逗号和等号进行分割 + String[] parts = payloadText.split("[,\\[\\]]"); + for (String part : parts) { + String[] keyValue = part.split("="); + + if (keyValue.length == 2) { + String key = keyValue[0].trim(); + String value = keyValue[1].trim(); + + payload.put(key, value); + } + } + return payload.getString("clientIdentifier"); + } + + /** + * 通过 channelId 获取 Channel + * @param channelId + * @return + */ + public Channel getChannel(String channelId) { + if (channelId != null) { + ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); + return ctx.channel(); + } else { + return null; + } + } + + /** + * 服务器端向某Topic发送消息 + * @param channelId 发送的通道id + * @param topic 主题Topic + * @param requestMsg 要发送的Json对象 + * @return + * @throws InterruptedException + */ + public ChannelFuture sendMsg(String channelId, String topic, String requestMsg) throws InterruptedException { + MqttRequest request = new MqttRequest((requestMsg.toString().getBytes())); + MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( + new MqttFixedHeader(MqttMessageType.PUBLISH, + request.isDup(), + request.getQos(), + request.isRetained(), + 0), + new MqttPublishVariableHeader(topic, 0), + Unpooled.buffer().writeBytes(request.getPayload())); + + Channel channel = getChannel(channelId); + // 超过高水位,则采取同步模式 + if (channel.isWritable()) { + return channel.writeAndFlush(pubMessage); + } + return channel.writeAndFlush(pubMessage).sync(); + } + } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttMsgBack.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttMsgBack.java index d8967a2bd..2a9d0e028 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttMsgBack.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/BootNettyMqttMsgBack.java @@ -1,10 +1,188 @@ package com.jsowell.netty.server.mqtt; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import com.jsowell.common.core.redis.RedisCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.netty.channel.Channel; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPubAckMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; +import io.netty.handler.codec.mqtt.MqttSubAckPayload; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + /** * TODO * * @author Lemon * @Date 2023/12/14 11:36:30 */ +@Component public class BootNettyMqttMsgBack { + + private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class); + + /** + * 确认连接请求 + * + * @param channel + * @param mqttMessage + */ + public static void connack(Channel channel, MqttMessage mqttMessage) { + log.info("服务器端收到确认连接请求:{}", mqttMessage.toString()); + MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage; + MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader(); + MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader(); + + // 构建返回报文, 可变报头 + MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession()); + // 构建返回报文, 固定报头 + MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); + // 构建CONNACK消息体 + MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack); + log.info("服务器端回复确认连接请求:{}", connAck.toString()); + channel.writeAndFlush(connAck); + + } + + + /** + * 根据qos发布确认 + * + * @param channel + * @param mqttMessage + */ + public static void puback(Channel channel, MqttMessage mqttMessage) { + MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; + MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader(); + MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel(); + byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()]; + mqttPublishMessage.payload().readBytes(headBytes); + String data = new String(headBytes); + System.out.println("publish data--" + data); + + switch (qos) { + case AT_MOST_ONCE: // 至多一次 + break; + case AT_LEAST_ONCE: // 至少一次 + // 构建返回报文, 可变报头 + MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); + // 构建返回报文, 固定报头 + MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); + // 构建PUBACK消息体 + MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack); + log.info("back--" + pubAck.toString()); + channel.writeAndFlush(pubAck); + break; + case EXACTLY_ONCE: // 刚好一次 + // 构建返回报文, 固定报头 + MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_LEAST_ONCE, false, 0x02); + // 构建返回报文, 可变报头 + MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); + MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2, mqttMessageIdVariableHeaderBack2); + log.info("back--" + mqttMessageBack.toString()); + channel.writeAndFlush(mqttMessageBack); + break; + default: + break; + } + } + + /** + * 发布完成 qos2 + * + * @param channel + * @param mqttMessage + */ + public static void pubcomp(Channel channel, MqttMessage mqttMessage) { + MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); + // 构建返回报文, 固定报头 + MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0x02); + // 构建返回报文, 可变报头 + MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); + MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack); + log.info("back--" + mqttMessageBack.toString()); + channel.writeAndFlush(mqttMessageBack); + } + + /** + * 订阅确认 + * + * @param channel + * @param mqttMessage + */ + public static void suback(Channel channel, MqttMessage mqttMessage) { + log.info("接收到订阅确认信息:{}", mqttMessage.toString()); + MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage; + MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); + // 构建返回报文, 可变报头 + MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); + Set topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet()); + //log.info(topics.toString()); + List grantedQoSLevels = new ArrayList<>(topics.size()); + for (int i = 0; i < topics.size(); i++) { + grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value()); + } + // 构建返回报文 有效负载 + MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels); + // 构建返回报文 固定报头 + MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2 + topics.size()); + // 构建返回报文 订阅确认 + MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack, variableHeaderBack, payloadBack); + log.info("回复订阅确认信息:{}", subAck.toString()); + channel.writeAndFlush(subAck); + } + + /** + * 取消订阅确认 + * + * @param channel + * @param mqttMessage + */ + public static void unsuback(Channel channel, MqttMessage mqttMessage) { + MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); + // 构建返回报文 可变报头 + MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); + // 构建返回报文 固定报头 + MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2); + // 构建返回报文 取消订阅确认 + MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack, variableHeaderBack); + log.info("back--" + unSubAck.toString()); + channel.writeAndFlush(unSubAck); + } + + /** + * 心跳响应 + * + * @param channel + * @param mqttMessage + */ + public static void pingresp(Channel channel, MqttMessage mqttMessage) { + // 心跳响应报文 11010000 00000000 固定报文 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessage mqttMessageBack = new MqttMessage(fixedHeader); + log.info("MqttServer回复心跳消息:{}", mqttMessageBack.toString()); + channel.writeAndFlush(mqttMessageBack); + } + + } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java new file mode 100644 index 000000000..e379a5cec --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java @@ -0,0 +1,78 @@ +package com.jsowell.netty.server.mqtt; + +import com.jsowell.common.constant.Constants; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; + +@Slf4j +@Component +@Order(value = 2) +public class MqttSever implements CommandLineRunner { + + @Override + public void run(String... args) throws Exception { + InetSocketAddress address = new InetSocketAddress(Constants.SOCKET_IP, 1883); + this.start(address); + } + + public void start(InetSocketAddress address) { + // log.info("========NettyServer.start order 1"); + //配置服务端的NIO线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap mqttBootstrap = new ServerBootstrap(); + mqttBootstrap.group(bossGroup, workerGroup); + mqttBootstrap.channel(NioServerSocketChannel.class); + + mqttBootstrap.option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_RCVBUF, 10485760); + + mqttBootstrap.childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + mqttBootstrap.childHandler(new ChannelInitializer() { + protected void initChannel(SocketChannel ch) { + ChannelPipeline channelPipeline = ch.pipeline(); + // 设置读写空闲超时时间 + channelPipeline.addLast(new IdleStateHandler(600, 600, 1200)); + channelPipeline.addLast("encoder", MqttEncoder.INSTANCE); + channelPipeline.addLast("decoder", new MqttDecoder()); + channelPipeline.addLast(new BootNettyMqttChannelInboundHandler()); + } + }); + ChannelFuture future = mqttBootstrap.bind(address.getPort()).sync(); + if(future.isSuccess()){ + log.info("MqttServer启动成功, 开始监听端口:{}", address.getPort()); + future.channel().closeFuture().sync(); + } else { + log.error("MqttServer启动失败", future.cause()); + } + + //关闭channel和块,直到它被关闭 + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("MqttServer.start error", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java index d248f1def..bcd7c972f 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java @@ -2,9 +2,7 @@ package com.jsowell.netty.server.yunkuaichong; import com.jsowell.common.constant.Constants; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; @@ -19,11 +17,11 @@ import java.net.InetSocketAddress; @Slf4j @Component +@Order(value = 1) public class NettyServer implements CommandLineRunner { @Resource private NettyServerChannelInitializer nettyServerChannelInitializer; - @Order(value = 1) @Override public void run(String... args) throws Exception { InetSocketAddress address = new InetSocketAddress(Constants.SOCKET_IP, Constants.SOCKET_PORT); @@ -66,8 +64,41 @@ public class NettyServer implements CommandLineRunner { } else { log.error("NettyServer启动失败", future.cause()); } + // + // ServerBootstrap mqttBootstrap = new ServerBootstrap(); + // mqttBootstrap.group(bossGroup, workerGroup); + // mqttBootstrap.channel(NioServerSocketChannel.class); + // + // mqttBootstrap.option(ChannelOption.SO_REUSEADDR, true) + // .option(ChannelOption.SO_BACKLOG, 1024) + // .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + // .option(ChannelOption.SO_RCVBUF, 10485760); + // + // mqttBootstrap.childOption(ChannelOption.TCP_NODELAY, true) + // .childOption(ChannelOption.SO_KEEPALIVE, true) + // .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + // + // mqttBootstrap.childHandler(new ChannelInitializer() { + // protected void initChannel(SocketChannel ch) { + // ChannelPipeline channelPipeline = ch.pipeline(); + // // 设置读写空闲超时时间 + // channelPipeline.addLast(new IdleStateHandler(600, 600, 1200)); + // channelPipeline.addLast("encoder", MqttEncoder.INSTANCE); + // channelPipeline.addLast("decoder", new MqttDecoder()); + // channelPipeline.addLast(new BootNettyMqttChannelInboundHandler()); + // } + // }); + // ChannelFuture future2 = mqttBootstrap.bind(address2.getPort()).sync(); + // if(future2.isSuccess()){ + // log.info("MqttServer启动成功, 开始监听端口:{}", address2.getPort()); + // future2.channel().closeFuture().sync(); + // } else { + // log.error("MqttServer启动失败", future2.cause()); + // } + //关闭channel和块,直到它被关闭 future.channel().closeFuture().sync(); + // future2.channel().closeFuture().sync(); } catch (Exception e) { log.error("NettyServer.start error", e); bossGroup.shutdownGracefully(); diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/camera/CameraBusinessService.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/camera/CameraBusinessService.java new file mode 100644 index 000000000..da838efed --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/camera/CameraBusinessService.java @@ -0,0 +1,20 @@ +package com.jsowell.netty.service.camera; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; + +import java.net.UnknownHostException; + +/** + * TODO + * + * @author Lemon + * @Date 2023/12/20 15:15:26 + */ +public interface CameraBusinessService { + public void sendGroundLockCommand(String sn, String msgType, String msgPrefix, String topic) throws InterruptedException; + + + public void processConnectMsg(Channel channel) throws UnknownHostException; + +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/service/camera/impl/CameraBusinessServiceImpl.java b/jsowell-netty/src/main/java/com/jsowell/netty/service/camera/impl/CameraBusinessServiceImpl.java new file mode 100644 index 000000000..90a2184b2 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/service/camera/impl/CameraBusinessServiceImpl.java @@ -0,0 +1,109 @@ +package com.jsowell.netty.service.camera.impl; + +import com.alibaba.fastjson2.JSONObject; +import com.jsowell.common.constant.CacheConstants; +import com.jsowell.common.core.redis.RedisCache; +import com.jsowell.common.util.DateUtils; +import com.jsowell.common.util.sign.MD5Util; +import com.jsowell.netty.domain.MqttRequest; +import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler; +import com.jsowell.netty.service.camera.CameraBusinessService; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.handler.codec.mqtt.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Locale; + +/** + * 车位相机业务Service + * + * @author Lemon + * @Date 2023/12/19 16:07:45 + */ +@Service +public class CameraBusinessServiceImpl implements CameraBusinessService { + + @Autowired + private RedisCache redisCache; + + @Autowired + private BootNettyMqttChannelInboundHandler handler; + + public void sendGroundLockCommand(String sn, String msgType, String msgPrefix, String topic) throws InterruptedException { + JSONObject jsonObject = spliceStr(sn, msgType, msgPrefix); + // 通过sn查找出对应的channelId + String mqttConnectRedisKey = CacheConstants.MQTT_CONNECT_CHANNEL + sn; + Object cacheObject = redisCache.getCacheObject(mqttConnectRedisKey); + if (cacheObject == null) { + return; + } + String channelId = (String) cacheObject; + + // 发送消息 + handler.sendMsg(channelId, topic, jsonObject.toJSONString()); + } + + public void processConnectMsg(Channel channel) throws UnknownHostException { + // 解析 channel 中的 ip 地址 + String remoteAddress = channel.remoteAddress().toString(); + // System.out.println(ip); + String[] parts = remoteAddress.substring(1).split(":"); + String ipAddress = parts[0]; // IP 地址部分 + String port = parts[1]; // 端口号部分 + // 使用 InetAddress 解析IP地址部分 + InetAddress inetAddress = InetAddress.getByName(ipAddress); + + // 输出标准的IP地址格式和端口号 + String standardIPAddress = inetAddress.getHostAddress(); + // System.out.println("端口号:" + port); + + // 获取相机心跳包的缓存信息,将 sn 和 channelId 进行绑定 + String redisKey = CacheConstants.CAMERA_HEARTBEAT + standardIPAddress; + Object cacheObject = redisCache.getCacheObject(redisKey); + if (cacheObject != null) { + String sn = (String) cacheObject; + // 将 sn 和 channelId 绑定关系存入缓存 + String mqttConnectRedisKey = CacheConstants.MQTT_CONNECT_CHANNEL + sn; + redisCache.setCacheObject(mqttConnectRedisKey, channel.id().asShortText()); + } + } + + /** + * 根据规则拼装字符串 + * @param sn 设备 sn + * @param msgType 消息类型 + * @param msgPrefix 消息前缀 + * @return 拼装好的json对象 + */ + private JSONObject spliceStr(String sn, String msgType, String msgPrefix) { + StringBuilder sb = new StringBuilder(); + String msgId = msgPrefix + DateUtils.dateTimeNow(DateUtils.YYYYMMDDHHMMSS) + "01"; + String timeStamp = DateUtils.dateTimeNow(DateUtils.YYYY_MM_DD_HH_MM_SS); + sb.append("sn=").append(sn) + .append("×tamp=").append(timeStamp) + .append("&msg_id=").append(msgId) + .append("&msg_type=").append(msgType); + // 进行 32 位 MD5 计算 + String sign = MD5Util.MD5Encode(sb.toString()).toUpperCase(Locale.ROOT); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("sign", sign); + jsonObject.put("sn", sn); + jsonObject.put("timestamp", timeStamp); + jsonObject.put("msg_id", msgId); + jsonObject.put("msg_type", msgType); + + return jsonObject; + } + + public static void main(String[] args) { + System.out.println(System.currentTimeMillis()); + } + + + +} diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/dto/camera/SendMsg2TopicDTO.java b/jsowell-pile/src/main/java/com/jsowell/pile/dto/camera/SendMsg2TopicDTO.java new file mode 100644 index 000000000..9bd8a26da --- /dev/null +++ b/jsowell-pile/src/main/java/com/jsowell/pile/dto/camera/SendMsg2TopicDTO.java @@ -0,0 +1,32 @@ +package com.jsowell.pile.dto.camera; + +import com.alibaba.fastjson2.JSON; +import lombok.Data; + +/** + * TODO + * + * @author Lemon + * @Date 2023/12/19 15:37:35 + */ +@Data +public class SendMsg2TopicDTO { + private String channelId; + private String topic; + private RequestMsg requestMsg; + + private String sn; + private String msgType; + private String msgPrefix; + + @Data + public static class RequestMsg{ + private String sign; + private String channelId; + private String sn; + private String timestamp; + private String msg_id; + private String msg_type; + private String msg_data; + } +} diff --git a/jsowell-thirdparty/src/main/java/com/jsowell/thirdparty/camera/service/CameraService.java b/jsowell-thirdparty/src/main/java/com/jsowell/thirdparty/camera/service/CameraService.java index 910d500f7..1888dbd3d 100644 --- a/jsowell-thirdparty/src/main/java/com/jsowell/thirdparty/camera/service/CameraService.java +++ b/jsowell-thirdparty/src/main/java/com/jsowell/thirdparty/camera/service/CameraService.java @@ -6,13 +6,18 @@ import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun.oss.model.ObjectMetadata; import com.jsowell.common.config.AliyunOssConfig; +import com.jsowell.common.constant.CacheConstants; import com.jsowell.common.core.redis.RedisCache; import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.file.AliyunOssUploadUtils; import com.jsowell.common.util.file.ImageUtils; import com.jsowell.pile.domain.PileCameraInfo; +import com.jsowell.pile.dto.GenerateOccupyOrderDTO; +import com.jsowell.pile.dto.camera.CameraHeartBeatDTO; import com.jsowell.pile.dto.camera.CameraIdentifyResultsDTO; import com.jsowell.pile.service.IPileCameraInfoService; +import com.jsowell.pile.service.OrderPileOccupyService; +import io.netty.channel.Channel; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +28,8 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Base64; @@ -46,26 +53,36 @@ public class CameraService { @Autowired private IPileCameraInfoService pileCameraInfoService; + @Autowired + private OrderPileOccupyService orderPileOccupyService; + public void receiveIdentifyResults(JSONObject jsonObject) { // 区分入场和出场 - // Integer parking_state = jsonObject.getJSONObject("parking").getInteger("parking_state"); - // if (parking_state == 1) { - // // 入场 - // String parkingState = "ENTRY"; - // vehicleEntry(jsonObject, parkingState); - // } - // if (parking_state == 2) { - // // 在场 - // } - // if (parking_state == 4) { - // // 出场 - // } + Integer parking_state = jsonObject.getJSONObject("parking").getInteger("parking_state"); + if (parking_state == 1) { + // 入场 + String parkingState = "ENTRY"; + vehicleEntry(jsonObject, parkingState); + } + if (parking_state == 2) { + // 在场 + } + if (parking_state == 4) { + // 出场 + } - saveInfo2DataBase(jsonObject); + // saveInfo2DataBase(jsonObject); } + + public void saveHeartBeat2Redis(CameraHeartBeatDTO dto) { + // 将基本信息存入缓存 + String redisKey = CacheConstants.CAMERA_HEARTBEAT + dto.getIp(); + redisCache.setCacheObject(redisKey, dto.getSn()); + } + /** * 车辆入场 * @param jsonObject @@ -76,6 +93,13 @@ public class CameraService { // 将信息存数据库 saveInfo2DataBase(jsonObject); + + // TODO 生成占桩订单 + // GenerateOccupyOrderDTO dto = new GenerateOccupyOrderDTO(); + // dto.setMemberId(); + // dto.setPileSn(); + // dto.setConnectorCode(); + // orderPileOccupyService.generateOccupyPileOrder() } private boolean saveInfo2DataBase(JSONObject jsonObject) { diff --git a/pom.xml b/pom.xml index 20c2f52c1..2f9327ade 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,8 @@ 20.0 2.5.14 4.1.75.Final + 1.2.5 + 1.2.5 1.2.10 @@ -227,6 +229,12 @@ ${netty-all.version} + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + ${mqttv3.version} + + com.tencentcloudapi tencentcloud-sdk-java