新增 netty整合mqtt协议,与车位相机通讯并保存通讯信息

This commit is contained in:
Lemon
2023-12-20 16:17:34 +08:00
parent 7015cb1234
commit 5fbce62752
16 changed files with 972 additions and 23 deletions

View File

@@ -4,18 +4,19 @@ import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.jsowell.common.annotation.Anonymous; import com.jsowell.common.annotation.Anonymous;
import com.jsowell.common.core.controller.BaseController; import com.jsowell.common.core.controller.BaseController;
import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler;
import com.jsowell.netty.service.camera.CameraBusinessService;
import com.jsowell.pile.dto.camera.CameraHeartBeatDTO; import com.jsowell.pile.dto.camera.CameraHeartBeatDTO;
import com.jsowell.pile.dto.camera.CameraIdentifyResultsDTO; import com.jsowell.pile.dto.camera.SendMsg2TopicDTO;
import com.jsowell.thirdparty.camera.common.CameraCommonResult; import com.jsowell.thirdparty.camera.common.CameraCommonResult;
import com.jsowell.thirdparty.camera.service.CameraService; import com.jsowell.thirdparty.camera.service.CameraService;
import io.netty.channel.ChannelFuture;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/** /**
* 充电相机 controller * 充电相机 controller
* *
@@ -29,6 +30,12 @@ public class CameraController extends BaseController {
@Autowired @Autowired
private CameraService cameraService; private CameraService cameraService;
@Autowired
private CameraBusinessService cameraBusinessService;
@Autowired
private BootNettyMqttChannelInboundHandler handler;
/** /**
* 心跳 * 心跳
* @param dto * @param dto
@@ -36,6 +43,7 @@ public class CameraController extends BaseController {
@PostMapping("/v1/receiveHeartBeat") @PostMapping("/v1/receiveHeartBeat")
public CameraCommonResult receiveHeartBeat(@RequestBody CameraHeartBeatDTO dto) { public CameraCommonResult receiveHeartBeat(@RequestBody CameraHeartBeatDTO dto) {
logger.info("接收相机系统心跳包 params:{}", JSON.toJSONString(dto)); logger.info("接收相机系统心跳包 params:{}", JSON.toJSONString(dto));
cameraService.saveHeartBeat2Redis(dto);
CameraCommonResult result = new CameraCommonResult(); CameraCommonResult result = new CameraCommonResult();
logger.info("接收相机系统心跳包 result:{}", JSON.toJSONString(result.successResponse())); logger.info("接收相机系统心跳包 result:{}", JSON.toJSONString(result.successResponse()));
return result.successResponse(); return result.successResponse();
@@ -53,4 +61,20 @@ public class CameraController extends BaseController {
cameraService.receiveIdentifyResults(jsonObject); cameraService.receiveIdentifyResults(jsonObject);
return result.successResponse(); return result.successResponse();
} }
@PostMapping("/sendMsg2Topic")
public void sendMsg2Topic(@RequestBody SendMsg2TopicDTO dto) {
try {
cameraBusinessService.sendGroundLockCommand(dto.getSn(), dto.getMsgType(), dto.getMsgPrefix(), dto.getTopic());
} catch (Exception e) {
logger.error("发送消息 error, ", e);
}
}
// @PostMapping("/sendMsg2Topic")
// public void sendMsg2Topic(@RequestBody SendMsg2TopicDTO dto) throws InterruptedException {
// ChannelFuture channelFuture = handler.sendMsg(dto.getChannelId(), dto.getTopic(), dto.getRequestMsg().toString());
// System.out.println("123");
// }
} }

View File

@@ -47,6 +47,7 @@ import com.jsowell.common.util.id.SnowflakeIdWorker;
import com.jsowell.common.util.ip.AddressUtils; import com.jsowell.common.util.ip.AddressUtils;
import com.jsowell.netty.handler.HeartbeatRequestHandler; import com.jsowell.netty.handler.HeartbeatRequestHandler;
import com.jsowell.netty.handler.TransactionRecordsRequestHandler; import com.jsowell.netty.handler.TransactionRecordsRequestHandler;
import com.jsowell.netty.service.camera.impl.CameraBusinessServiceImpl;
import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; import com.jsowell.netty.service.yunkuaichong.YKCBusinessService;
import com.jsowell.pile.domain.*; import com.jsowell.pile.domain.*;
import com.jsowell.pile.domain.ykcCommond.IssueQRCodeCommand; import com.jsowell.pile.domain.ykcCommond.IssueQRCodeCommand;
@@ -85,6 +86,7 @@ import com.jsowell.wxpay.common.WeChatPayParameter;
import com.jsowell.wxpay.dto.AppletTemplateMessageSendDTO; import com.jsowell.wxpay.dto.AppletTemplateMessageSendDTO;
import com.jsowell.wxpay.response.WechatPayRefundRequest; import com.jsowell.wxpay.response.WechatPayRefundRequest;
import com.jsowell.wxpay.service.WxAppletRemoteService; import com.jsowell.wxpay.service.WxAppletRemoteService;
import io.netty.channel.ChannelFuture;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Test; import org.junit.Test;
@@ -220,6 +222,9 @@ public class SpringBootTestController {
@Autowired @Autowired
private OrderPileOccupyService orderPileOccupyService; private OrderPileOccupyService orderPileOccupyService;
@Autowired
private CameraBusinessServiceImpl cameraBusinessServiceImpl;
@Autowired @Autowired
private LTYTService ltytService; private LTYTService ltytService;
@@ -248,6 +253,23 @@ public class SpringBootTestController {
System.out.println(JSON.toJSONString(memberWalletVOS)); System.out.println(JSON.toJSONString(memberWalletVOS));
} }
@Test
public void testMqttSendMsg() throws InterruptedException {
String channelId = "94dd42b6";
String topic = "/GroundlockStatus";
JSONObject jsonObject = new JSONObject();
jsonObject.put("sign","F4213AD90EBC72C678E03450E4E091EE");
jsonObject.put("sn","e27f089d-5fadf6c6");
jsonObject.put("timestamp","2021-07-01 12:00:01");
jsonObject.put("msg_id","GS2021070112000101");
jsonObject.put("msg_type","GroundlockStatus");
jsonObject.put("msg_data",null);
// ChannelFuture future = cameraBusinessServiceImpl.sendMsg(channelId, topic, jsonObject.toJSONString());
// System.out.println(future.toString());
}
@Test @Test
public void queryPaymentByOrderNoTest() { public void queryPaymentByOrderNoTest() {
String orderNo = "C44903356969"; String orderNo = "C44903356969";

View File

@@ -199,6 +199,16 @@ public class CacheConstants {
*/ */
public static final String CAMERA_IMAGE_BY_PLATE_NUMBER = "CAMERA_IMAGE_BY_PLATE_NUMBER_"; public static final String CAMERA_IMAGE_BY_PLATE_NUMBER = "CAMERA_IMAGE_BY_PLATE_NUMBER_";
/**
* 相机心跳数据
*/
public static final String CAMERA_HEARTBEAT = "CAMERA_HEARTBEAT_";
/**
* 相机设备 sn 和 channelId 进行绑定
*/
public static final String MQTT_CONNECT_CHANNEL = "MQTT_CONNECT_CHANNEL_";
/** /**
* 桩硬件故障 * 桩硬件故障
*/ */

View File

@@ -1,5 +1,10 @@
package com.jsowell.common.util.bean; package com.jsowell.common.util.bean;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -11,7 +16,11 @@ import java.util.regex.Pattern;
* *
* @author jsowell * @author jsowell
*/ */
public class BeanUtils extends org.springframework.beans.BeanUtils { @Component
public class BeanUtils extends org.springframework.beans.BeanUtils implements ApplicationContextAware {
protected static ApplicationContext applicationContext;
/** /**
* Bean方法名中属性名开始的下标 * Bean方法名中属性名开始的下标
*/ */
@@ -101,4 +110,18 @@ public class BeanUtils extends org.springframework.beans.BeanUtils {
public static boolean isMethodPropEquals(String m1, String m2) { public static boolean isMethodPropEquals(String m1, String m2) {
return m1.substring(BEAN_METHOD_PROP_INDEX).equals(m2.substring(BEAN_METHOD_PROP_INDEX)); return m1.substring(BEAN_METHOD_PROP_INDEX).equals(m2.substring(BEAN_METHOD_PROP_INDEX));
} }
@Override
public void setApplicationContext(ApplicationContext app) throws BeansException {
if (applicationContext == null) {
applicationContext = app;
}
}
/**
* 通过类的class从容器中手动获取对象
*/
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
} }

View File

@@ -25,6 +25,11 @@
<groupId>com.jsowell</groupId> <groupId>com.jsowell</groupId>
<artifactId>jsowell-thirdparty</artifactId> <artifactId>jsowell-thirdparty</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
<version>4.1.79.Final</version>
</dependency>
</dependencies> </dependencies>

View File

@@ -0,0 +1,24 @@
package com.jsowell.netty.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* TODO
*
* @author Lemon
* @Date 2023/12/19 9:39:48
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class MqttConnectPayload {
private String clientIdentifier;
private String willTopic;
private String willMessage;
private String userName;
private String password;
}

View File

@@ -0,0 +1,97 @@
package com.jsowell.netty.domain;
import io.netty.handler.codec.mqtt.MqttQoS;
/**
* TODO
*
* @author Lemon
* @Date 2023/12/19 8:27:30
*/
public class MqttRequest {
private boolean mutable = true;
private byte[] payload;
private MqttQoS qos = MqttQoS.AT_MOST_ONCE;
private boolean retained = false;
private boolean dup = false;
private int messageId;
public MqttRequest() {
this.setPayload(new byte[0]);
}
public MqttRequest(byte[] payload) {
this.setPayload(payload);
}
public byte[] getPayload() {
return this.payload;
}
public void clearPayload() {
this.checkMutable();
this.payload = new byte[0];
}
public void setPayload(byte[] payload) {
this.checkMutable();
if (payload == null) {
throw new NullPointerException();
} else {
this.payload = payload;
}
}
public boolean isRetained() {
return this.retained;
}
public void setRetained(boolean retained) {
this.checkMutable();
this.retained = retained;
}
public MqttQoS getQos() {
return qos;
}
public void setQos(MqttQoS qos) {
this.qos = qos;
}
public boolean isMutable() {
return mutable;
}
public void setMutable(boolean mutable) {
this.mutable = mutable;
}
protected void checkMutable() throws IllegalStateException {
if (!this.mutable) {
throw new IllegalStateException();
}
}
public boolean isDup() {
return dup;
}
public void setDup(boolean dup) {
this.dup = dup;
}
public int getMessageId() {
return messageId;
}
public void setMessageId(int messageId) {
this.messageId = messageId;
}
@Override
public String toString() {
return new String(this.payload);
}
}

View File

@@ -1,10 +1,274 @@
package com.jsowell.netty.server.mqtt; package com.jsowell.netty.server.mqtt;
import com.alibaba.fastjson2.JSONObject;
import com.jsowell.common.util.StringUtils;
import com.jsowell.common.util.bean.BeanUtils;
import com.jsowell.netty.domain.MqttRequest;
import com.jsowell.netty.service.camera.CameraBusinessService;
import com.jsowell.netty.service.camera.impl.CameraBusinessServiceImpl;
import com.jsowell.thirdparty.camera.service.CameraService;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** /**
* TODO * MQTT服务端I/O数据读写处理类
* *
* @author Lemon * @author Lemon
* @Date 2023/12/14 11:33:03 * @Date 2023/12/14 11:33:03
*/ */
public class BootNettyMqttChannelInboundHandler {
@ChannelHandler.Sharable
@Slf4j
@Component
public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAdapter {
/**
* Key: channelId
* Value: ctx
*/
private static final ConcurrentHashMap<String, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* Key: channelId
* Value: 客户端id
*/
private static final ConcurrentHashMap<String, String> CLIENT_MAP = new ConcurrentHashMap<>();
/**
* 从客户端收到新的数据时,这个方法会在收到消息时被调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
if (null != msg) {
MqttMessage mqttMessage = (MqttMessage) msg;
log.info("MqttServer收到消息:{}", mqttMessage.toString());
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
Channel channel = ctx.channel();
if (mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)) {
// 在一个网络连接上客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
// to do 建议connect消息单独处理用来对客户端进行认证管理等 这里先直接返回一个CONNACK消息
BootNettyMqttMsgBack.connack(channel, mqttMessage);
// 如果map中不包含此连接就保存连接
String channelId = channel.id().asShortText();
System.out.println(channelId);
if (CHANNEL_MAP.get(channelId) != null) {
log.info("客户端【{}】是连接状态,连接通道数量: {}", channelId, CHANNEL_MAP.size());
} else {
//保存连接
CHANNEL_MAP.put(channelId, ctx);
// System.out.println(channel.remoteAddress()); // /192.168.2.100:39396
log.info("客户端【{}】, 连接Mqtt服务器, 连接通道数量: {}", channelId, CHANNEL_MAP.size());
CameraBusinessService cameraBusinessService = BeanUtils.getBean(CameraBusinessService.class);
cameraBusinessService.processConnectMsg(channel);
}
// 保存客户端id
String clientIdentifier = parsePayLoadInfo(mqttMessage.payload().toString());
if (StringUtils.isNotBlank(clientIdentifier)) {
CLIENT_MAP.put(channelId, clientIdentifier);
}
}
switch (mqttFixedHeader.messageType()) {
case PUBLISH: // 客户端发布消息
// PUBACK报文是对QoS 1等级的PUBLISH报文的响应
System.out.println("客户端发布消息");
BootNettyMqttMsgBack.puback(channel, mqttMessage);
break;
case PUBREL: // 发布释放
// PUBREL报文是对PUBREC报文的响应
// to do
BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
break;
case SUBSCRIBE: // 客户端订阅主题
// 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅每个订阅注册客户端关心的一个或多个主题。
// 为了将应用消息转发给与那些订阅匹配的主题服务端发送PUBLISH报文给客户端。
// SUBSCRIBE报文也为每个订阅指定了最大的QoS等级服务端根据这个发送应用消息给客户端
// to do
BootNettyMqttMsgBack.suback(channel, mqttMessage);
break;
case UNSUBSCRIBE: // 客户端取消订阅
// 客户端发送UNSUBSCRIBE报文给服务端用于取消订阅主题
// to do
BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
break;
case PINGREQ: // 客户端发起心跳
// 客户端发送PINGREQ报文给服务端的
// 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
// 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
break;
case DISCONNECT: // 客户端主动断开连接
// DISCONNECT报文是客户端发给服务端的最后一个控制报文 服务端必须验证所有的保留位都被设置为0
// to do
break;
default:
break;
}
}
}
/**
* 从客户端收到新的数据、读取完成时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
}
/**
* 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
/**
* 客户端与服务端 断连时执行 channelInactive方法之后执行
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}
/**
* 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
/**
* 客户端与服务端第一次建立连接时执行
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/**
* 客户端与服务端 断连时执行
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
// 删除连接
removeConnect(ctx);
super.channelInactive(ctx);
}
/**
* 服务端 当读超时时 会调用这个方法
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
// 删除连接
removeConnect(ctx);
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}
/**
* 删除连接
* @param ctx
*/
public void removeConnect(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
String channelId = ctx.channel().id().asShortText();
//包含此客户端才去删除
if (CHANNEL_MAP.get(channelId) != null) {
//删除连接
CHANNEL_MAP.remove(channelId);
CLIENT_MAP.remove(channelId);
log.info("客户端【{}】, 退出Mqtt服务器[IP:{}--->PORT:{}], 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size());
}
}
/**
* 解析 payload 字段,并将 clientIdentifier 客户端id返回
* @param payloadText
* @return 客户端id
*/
private String parsePayLoadInfo(String payloadText) {
if (StringUtils.isBlank(payloadText)) {
return null;
}
JSONObject payload = new JSONObject();
// 使用逗号和等号进行分割
String[] parts = payloadText.split("[,\\[\\]]");
for (String part : parts) {
String[] keyValue = part.split("=");
if (keyValue.length == 2) {
String key = keyValue[0].trim();
String value = keyValue[1].trim();
payload.put(key, value);
}
}
return payload.getString("clientIdentifier");
}
/**
* 通过 channelId 获取 Channel
* @param channelId
* @return
*/
public Channel getChannel(String channelId) {
if (channelId != null) {
ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
return ctx.channel();
} else {
return null;
}
}
/**
* 服务器端向某Topic发送消息
* @param channelId 发送的通道id
* @param topic 主题Topic
* @param requestMsg 要发送的Json对象
* @return
* @throws InterruptedException
*/
public ChannelFuture sendMsg(String channelId, String topic, String requestMsg) throws InterruptedException {
MqttRequest request = new MqttRequest((requestMsg.toString().getBytes()));
MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH,
request.isDup(),
request.getQos(),
request.isRetained(),
0),
new MqttPublishVariableHeader(topic, 0),
Unpooled.buffer().writeBytes(request.getPayload()));
Channel channel = getChannel(channelId);
// 超过高水位,则采取同步模式
if (channel.isWritable()) {
return channel.writeAndFlush(pubMessage);
}
return channel.writeAndFlush(pubMessage).sync();
}
} }

View File

@@ -1,10 +1,188 @@
package com.jsowell.netty.server.mqtt; package com.jsowell.netty.server.mqtt;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import com.jsowell.common.core.redis.RedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** /**
* TODO * TODO
* *
* @author Lemon * @author Lemon
* @Date 2023/12/14 11:36:30 * @Date 2023/12/14 11:36:30
*/ */
@Component
public class BootNettyMqttMsgBack { public class BootNettyMqttMsgBack {
private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
/**
* 确认连接请求
*
* @param channel
* @param mqttMessage
*/
public static void connack(Channel channel, MqttMessage mqttMessage) {
log.info("服务器端收到确认连接请求:{}", mqttMessage.toString());
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
// 构建返回报文, 可变报头
MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
// 构建CONNACK消息体
MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
log.info("服务器端回复确认连接请求:{}", connAck.toString());
channel.writeAndFlush(connAck);
}
/**
* 根据qos发布确认
*
* @param channel
* @param mqttMessage
*/
public static void puback(Channel channel, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
mqttPublishMessage.payload().readBytes(headBytes);
String data = new String(headBytes);
System.out.println("publish data--" + data);
switch (qos) {
case AT_MOST_ONCE: // 至多一次
break;
case AT_LEAST_ONCE: // 至少一次
// 构建返回报文, 可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
// 构建PUBACK消息体
MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
log.info("back--" + pubAck.toString());
channel.writeAndFlush(pubAck);
break;
case EXACTLY_ONCE: // 刚好一次
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_LEAST_ONCE, false, 0x02);
// 构建返回报文, 可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2, mqttMessageIdVariableHeaderBack2);
log.info("back--" + mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
break;
default:
break;
}
}
/**
* 发布完成 qos2
*
* @param channel
* @param mqttMessage
*/
public static void pubcomp(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
// 构建返回报文, 可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
log.info("back--" + mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
}
/**
* 订阅确认
*
* @param channel
* @param mqttMessage
*/
public static void suback(Channel channel, MqttMessage mqttMessage) {
log.info("接收到订阅确认信息:{}", mqttMessage.toString());
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
// 构建返回报文, 可变报头
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
//log.info(topics.toString());
List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
for (int i = 0; i < topics.size(); i++) {
grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
}
// 构建返回报文 有效负载
MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
// 构建返回报文 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2 + topics.size());
// 构建返回报文 订阅确认
MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack, variableHeaderBack, payloadBack);
log.info("回复订阅确认信息:{}", subAck.toString());
channel.writeAndFlush(subAck);
}
/**
* 取消订阅确认
*
* @param channel
* @param mqttMessage
*/
public static void unsuback(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
// 构建返回报文 可变报头
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
// 构建返回报文 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
// 构建返回报文 取消订阅确认
MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack, variableHeaderBack);
log.info("back--" + unSubAck.toString());
channel.writeAndFlush(unSubAck);
}
/**
* 心跳响应
*
* @param channel
* @param mqttMessage
*/
public static void pingresp(Channel channel, MqttMessage mqttMessage) {
// 心跳响应报文 11010000 00000000 固定报文
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
log.info("MqttServer回复心跳消息{}", mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
}
} }

View File

@@ -0,0 +1,78 @@
package com.jsowell.netty.server.mqtt;
import com.jsowell.common.constant.Constants;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Slf4j
@Component
@Order(value = 2)
public class MqttSever implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
InetSocketAddress address = new InetSocketAddress(Constants.SOCKET_IP, 1883);
this.start(address);
}
public void start(InetSocketAddress address) {
// log.info("========NettyServer.start order 1");
//配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap mqttBootstrap = new ServerBootstrap();
mqttBootstrap.group(bossGroup, workerGroup);
mqttBootstrap.channel(NioServerSocketChannel.class);
mqttBootstrap.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_RCVBUF, 10485760);
mqttBootstrap.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
mqttBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ChannelPipeline channelPipeline = ch.pipeline();
// 设置读写空闲超时时间
channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
channelPipeline.addLast("decoder", new MqttDecoder());
channelPipeline.addLast(new BootNettyMqttChannelInboundHandler());
}
});
ChannelFuture future = mqttBootstrap.bind(address.getPort()).sync();
if(future.isSuccess()){
log.info("MqttServer启动成功, 开始监听端口:{}", address.getPort());
future.channel().closeFuture().sync();
} else {
log.error("MqttServer启动失败", future.cause());
}
//关闭channel和块直到它被关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("MqttServer.start error", e);
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@@ -2,9 +2,7 @@ package com.jsowell.netty.server.yunkuaichong;
import com.jsowell.common.constant.Constants; import com.jsowell.common.constant.Constants;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.*;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
@@ -19,11 +17,11 @@ import java.net.InetSocketAddress;
@Slf4j @Slf4j
@Component @Component
@Order(value = 1)
public class NettyServer implements CommandLineRunner { public class NettyServer implements CommandLineRunner {
@Resource @Resource
private NettyServerChannelInitializer nettyServerChannelInitializer; private NettyServerChannelInitializer nettyServerChannelInitializer;
@Order(value = 1)
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
InetSocketAddress address = new InetSocketAddress(Constants.SOCKET_IP, Constants.SOCKET_PORT); InetSocketAddress address = new InetSocketAddress(Constants.SOCKET_IP, Constants.SOCKET_PORT);
@@ -66,8 +64,41 @@ public class NettyServer implements CommandLineRunner {
} else { } else {
log.error("NettyServer启动失败", future.cause()); log.error("NettyServer启动失败", future.cause());
} }
//
// ServerBootstrap mqttBootstrap = new ServerBootstrap();
// mqttBootstrap.group(bossGroup, workerGroup);
// mqttBootstrap.channel(NioServerSocketChannel.class);
//
// mqttBootstrap.option(ChannelOption.SO_REUSEADDR, true)
// .option(ChannelOption.SO_BACKLOG, 1024)
// .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// .option(ChannelOption.SO_RCVBUF, 10485760);
//
// mqttBootstrap.childOption(ChannelOption.TCP_NODELAY, true)
// .childOption(ChannelOption.SO_KEEPALIVE, true)
// .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//
// mqttBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
// protected void initChannel(SocketChannel ch) {
// ChannelPipeline channelPipeline = ch.pipeline();
// // 设置读写空闲超时时间
// channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
// channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
// channelPipeline.addLast("decoder", new MqttDecoder());
// channelPipeline.addLast(new BootNettyMqttChannelInboundHandler());
// }
// });
// ChannelFuture future2 = mqttBootstrap.bind(address2.getPort()).sync();
// if(future2.isSuccess()){
// log.info("MqttServer启动成功, 开始监听端口:{}", address2.getPort());
// future2.channel().closeFuture().sync();
// } else {
// log.error("MqttServer启动失败", future2.cause());
// }
//关闭channel和块直到它被关闭 //关闭channel和块直到它被关闭
future.channel().closeFuture().sync(); future.channel().closeFuture().sync();
// future2.channel().closeFuture().sync();
} catch (Exception e) { } catch (Exception e) {
log.error("NettyServer.start error", e); log.error("NettyServer.start error", e);
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();

View File

@@ -0,0 +1,20 @@
package com.jsowell.netty.service.camera;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.net.UnknownHostException;
/**
* TODO
*
* @author Lemon
* @Date 2023/12/20 15:15:26
*/
public interface CameraBusinessService {
public void sendGroundLockCommand(String sn, String msgType, String msgPrefix, String topic) throws InterruptedException;
public void processConnectMsg(Channel channel) throws UnknownHostException;
}

View File

@@ -0,0 +1,109 @@
package com.jsowell.netty.service.camera.impl;
import com.alibaba.fastjson2.JSONObject;
import com.jsowell.common.constant.CacheConstants;
import com.jsowell.common.core.redis.RedisCache;
import com.jsowell.common.util.DateUtils;
import com.jsowell.common.util.sign.MD5Util;
import com.jsowell.netty.domain.MqttRequest;
import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler;
import com.jsowell.netty.service.camera.CameraBusinessService;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Locale;
/**
* 车位相机业务Service
*
* @author Lemon
* @Date 2023/12/19 16:07:45
*/
@Service
public class CameraBusinessServiceImpl implements CameraBusinessService {
@Autowired
private RedisCache redisCache;
@Autowired
private BootNettyMqttChannelInboundHandler handler;
public void sendGroundLockCommand(String sn, String msgType, String msgPrefix, String topic) throws InterruptedException {
JSONObject jsonObject = spliceStr(sn, msgType, msgPrefix);
// 通过sn查找出对应的channelId
String mqttConnectRedisKey = CacheConstants.MQTT_CONNECT_CHANNEL + sn;
Object cacheObject = redisCache.getCacheObject(mqttConnectRedisKey);
if (cacheObject == null) {
return;
}
String channelId = (String) cacheObject;
// 发送消息
handler.sendMsg(channelId, topic, jsonObject.toJSONString());
}
public void processConnectMsg(Channel channel) throws UnknownHostException {
// 解析 channel 中的 ip 地址
String remoteAddress = channel.remoteAddress().toString();
// System.out.println(ip);
String[] parts = remoteAddress.substring(1).split(":");
String ipAddress = parts[0]; // IP 地址部分
String port = parts[1]; // 端口号部分
// 使用 InetAddress 解析IP地址部分
InetAddress inetAddress = InetAddress.getByName(ipAddress);
// 输出标准的IP地址格式和端口号
String standardIPAddress = inetAddress.getHostAddress();
// System.out.println("端口号:" + port);
// 获取相机心跳包的缓存信息,将 sn 和 channelId 进行绑定
String redisKey = CacheConstants.CAMERA_HEARTBEAT + standardIPAddress;
Object cacheObject = redisCache.getCacheObject(redisKey);
if (cacheObject != null) {
String sn = (String) cacheObject;
// 将 sn 和 channelId 绑定关系存入缓存
String mqttConnectRedisKey = CacheConstants.MQTT_CONNECT_CHANNEL + sn;
redisCache.setCacheObject(mqttConnectRedisKey, channel.id().asShortText());
}
}
/**
* 根据规则拼装字符串
* @param sn 设备 sn
* @param msgType 消息类型
* @param msgPrefix 消息前缀
* @return 拼装好的json对象
*/
private JSONObject spliceStr(String sn, String msgType, String msgPrefix) {
StringBuilder sb = new StringBuilder();
String msgId = msgPrefix + DateUtils.dateTimeNow(DateUtils.YYYYMMDDHHMMSS) + "01";
String timeStamp = DateUtils.dateTimeNow(DateUtils.YYYY_MM_DD_HH_MM_SS);
sb.append("sn=").append(sn)
.append("&timestamp=").append(timeStamp)
.append("&msg_id=").append(msgId)
.append("&msg_type=").append(msgType);
// 进行 32 位 MD5 计算
String sign = MD5Util.MD5Encode(sb.toString()).toUpperCase(Locale.ROOT);
JSONObject jsonObject = new JSONObject();
jsonObject.put("sign", sign);
jsonObject.put("sn", sn);
jsonObject.put("timestamp", timeStamp);
jsonObject.put("msg_id", msgId);
jsonObject.put("msg_type", msgType);
return jsonObject;
}
public static void main(String[] args) {
System.out.println(System.currentTimeMillis());
}
}

View File

@@ -0,0 +1,32 @@
package com.jsowell.pile.dto.camera;
import com.alibaba.fastjson2.JSON;
import lombok.Data;
/**
* TODO
*
* @author Lemon
* @Date 2023/12/19 15:37:35
*/
@Data
public class SendMsg2TopicDTO {
private String channelId;
private String topic;
private RequestMsg requestMsg;
private String sn;
private String msgType;
private String msgPrefix;
@Data
public static class RequestMsg{
private String sign;
private String channelId;
private String sn;
private String timestamp;
private String msg_id;
private String msg_type;
private String msg_data;
}
}

View File

@@ -6,13 +6,18 @@ import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder; import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.model.ObjectMetadata; import com.aliyun.oss.model.ObjectMetadata;
import com.jsowell.common.config.AliyunOssConfig; import com.jsowell.common.config.AliyunOssConfig;
import com.jsowell.common.constant.CacheConstants;
import com.jsowell.common.core.redis.RedisCache; import com.jsowell.common.core.redis.RedisCache;
import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.StringUtils;
import com.jsowell.common.util.file.AliyunOssUploadUtils; import com.jsowell.common.util.file.AliyunOssUploadUtils;
import com.jsowell.common.util.file.ImageUtils; import com.jsowell.common.util.file.ImageUtils;
import com.jsowell.pile.domain.PileCameraInfo; import com.jsowell.pile.domain.PileCameraInfo;
import com.jsowell.pile.dto.GenerateOccupyOrderDTO;
import com.jsowell.pile.dto.camera.CameraHeartBeatDTO;
import com.jsowell.pile.dto.camera.CameraIdentifyResultsDTO; import com.jsowell.pile.dto.camera.CameraIdentifyResultsDTO;
import com.jsowell.pile.service.IPileCameraInfoService; import com.jsowell.pile.service.IPileCameraInfoService;
import com.jsowell.pile.service.OrderPileOccupyService;
import io.netty.channel.Channel;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -23,6 +28,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.Base64; import java.util.Base64;
@@ -46,26 +53,36 @@ public class CameraService {
@Autowired @Autowired
private IPileCameraInfoService pileCameraInfoService; private IPileCameraInfoService pileCameraInfoService;
@Autowired
private OrderPileOccupyService orderPileOccupyService;
public void receiveIdentifyResults(JSONObject jsonObject) { public void receiveIdentifyResults(JSONObject jsonObject) {
// 区分入场和出场 // 区分入场和出场
// Integer parking_state = jsonObject.getJSONObject("parking").getInteger("parking_state"); Integer parking_state = jsonObject.getJSONObject("parking").getInteger("parking_state");
// if (parking_state == 1) { if (parking_state == 1) {
// // 入场 // 入场
// String parkingState = "ENTRY"; String parkingState = "ENTRY";
// vehicleEntry(jsonObject, parkingState); vehicleEntry(jsonObject, parkingState);
// } }
// if (parking_state == 2) { if (parking_state == 2) {
// // 在场 // 在场
// } }
// if (parking_state == 4) { if (parking_state == 4) {
// // 出场 // 出场
// } }
saveInfo2DataBase(jsonObject); // saveInfo2DataBase(jsonObject);
} }
public void saveHeartBeat2Redis(CameraHeartBeatDTO dto) {
// 将基本信息存入缓存
String redisKey = CacheConstants.CAMERA_HEARTBEAT + dto.getIp();
redisCache.setCacheObject(redisKey, dto.getSn());
}
/** /**
* 车辆入场 * 车辆入场
* @param jsonObject * @param jsonObject
@@ -76,6 +93,13 @@ public class CameraService {
// 将信息存数据库 // 将信息存数据库
saveInfo2DataBase(jsonObject); saveInfo2DataBase(jsonObject);
// TODO 生成占桩订单
// GenerateOccupyOrderDTO dto = new GenerateOccupyOrderDTO();
// dto.setMemberId();
// dto.setPileSn();
// dto.setConnectorCode();
// orderPileOccupyService.generateOccupyPileOrder()
} }
private boolean saveInfo2DataBase(JSONObject jsonObject) { private boolean saveInfo2DataBase(JSONObject jsonObject) {

View File

@@ -35,6 +35,8 @@
<guava.version>20.0</guava.version> <guava.version>20.0</guava.version>
<spring.boot.test.version>2.5.14</spring.boot.test.version> <spring.boot.test.version>2.5.14</spring.boot.test.version>
<netty-all.version>4.1.75.Final</netty-all.version> <netty-all.version>4.1.75.Final</netty-all.version>
<mqttv3.version>1.2.5</mqttv3.version>
<mqtt-codec.version>1.2.5</mqtt-codec.version>
<huifu.version>1.2.10</huifu.version> <huifu.version>1.2.10</huifu.version>
</properties> </properties>
@@ -227,6 +229,12 @@
<version>${netty-all.version}</version> <version>${netty-all.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${mqttv3.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.tencentcloudapi</groupId> <groupId>com.tencentcloudapi</groupId>
<artifactId>tencentcloud-sdk-java</artifactId> <artifactId>tencentcloud-sdk-java</artifactId>