diff --git a/jsowell-admin/pom.xml b/jsowell-admin/pom.xml index 8cf2bb63c..052d409ea 100644 --- a/jsowell-admin/pom.xml +++ b/jsowell-admin/pom.xml @@ -107,6 +107,12 @@ jsowell-thirdparty + + org.projectlombok + lombok + test + + diff --git a/jsowell-admin/src/main/java/com/jsowell/api/uniapp/customer/PersonPileController.java b/jsowell-admin/src/main/java/com/jsowell/api/uniapp/customer/PersonPileController.java index ed35d10e2..e69cc2c18 100644 --- a/jsowell-admin/src/main/java/com/jsowell/api/uniapp/customer/PersonPileController.java +++ b/jsowell-admin/src/main/java/com/jsowell/api/uniapp/customer/PersonPileController.java @@ -153,16 +153,17 @@ public class PersonPileController extends BaseController { @GetMapping("/getPersonalPileList") public RestApiResponse> getPersonalPileList(HttpServletRequest request) { RestApiResponse> response = null; + String memberId = null; try { - String memberId = getMemberIdByAuthorization(request); - logger.info("通过memberId查个人桩列表 params: {}", memberId); + memberId = getMemberIdByAuthorization(request); + // logger.info("通过memberId查个人桩列表 params: {}", memberId); List list = pileBasicInfoService.getPileInfoByMemberId(memberId); response = new RestApiResponse<>(list); } catch (Exception e) { logger.error("通过memberId查个人桩列表异常", e); response = new RestApiResponse<>(ReturnCodeEnum.CODE_GET_PERSONAL_PILE_BY_MEMBER_ID_ERROR); } - logger.info("通过memberId查个人桩列表 result:{}", response); + logger.info("通过memberId查个人桩列表, params: {}, result:{}", memberId, response); return response; } @@ -179,8 +180,8 @@ public class PersonPileController extends BaseController { public RestApiResponse> getConnectorRealTimeInfo(HttpServletRequest request, @RequestBody QueryPersonPileDTO dto) { RestApiResponse> response = null; try { - String memberId = getMemberIdByAuthorization(request); - dto.setMemberId(memberId); + // String memberId = getMemberIdByAuthorization(request); + // dto.setMemberId(memberId); PersonPileRealTimeVO connectorRealTimeInfo = pileService.getConnectorRealTimeInfo(dto); response = new RestApiResponse<>(connectorRealTimeInfo); } catch (BusinessException e) { diff --git a/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java b/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java index a7a2f25ff..2813edd97 100644 --- a/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java +++ b/jsowell-admin/src/main/java/com/jsowell/service/OrderService.java @@ -663,15 +663,17 @@ public class OrderService { } } - try { - // 因为原来的数据在redis中是永久保存,所以这里做下查询详情的时候,发现已经是完成的订单,redis数据存到表中 - if (StringUtils.equals(orderBasicInfo.getOrderStatus(), OrderStatusEnum.ORDER_COMPLETE.getValue())) { - // 如果是已完成的订单,把redis中的实时数据存到表中 - orderBasicInfoService.realTimeMonitorDataRedis2DB(orderBasicInfo.getTransactionCode(), orderBasicInfo.getOrderCode()); + // 因为原来的数据在redis中是永久保存,所以这里做下查询详情的时候,发现已经是完成的订单,redis数据存到表中 + CompletableFuture.runAsync(() -> { + try { + if (StringUtils.equals(orderBasicInfo.getOrderStatus(), OrderStatusEnum.ORDER_COMPLETE.getValue())) { + // 如果是已完成的订单,把redis中的实时数据存到表中 + orderBasicInfoService.realTimeMonitorDataRedis2DB(orderBasicInfo.getTransactionCode(), orderBasicInfo.getOrderCode()); + } + } catch (Exception e) { + log.error("后管查询订单详情时把redis中的实时数据存到表发生异常", e); } - } catch (Exception e) { - log.error("后管查询订单详情时把redis中的实时数据存到表发生异常", e); - } + }); return vo; } diff --git a/jsowell-admin/src/test/java/SpringBootTestController.java b/jsowell-admin/src/test/java/SpringBootTestController.java index f2865f3d0..888a4c9f8 100644 --- a/jsowell-admin/src/test/java/SpringBootTestController.java +++ b/jsowell-admin/src/test/java/SpringBootTestController.java @@ -106,6 +106,8 @@ import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @ActiveProfiles("dev") @@ -332,12 +334,12 @@ public class SpringBootTestController { @Test public void createBalancePaymentRequestTest() { - String outMemberId = "ACM40782726"; - String inMemberId = "0"; - String transAmt = "124.72"; - String title = "提取余额到自己账户"; - String desc = "2024年6月14日14点15分,售后需求:客户需要重新添加结算账户,原账户余额放弃提取"; - String wechatAppId = wechatAppId1; + String outMemberId = "ACM42875164"; // 出账memberId + String inMemberId = "0"; // 入账memberId + String transAmt = "798.20"; // 金额 + String title = "提取余额到自己账户"; // 标题 + String desc = "2024年7月31日08点55分,售后需求:客户重新添加结算账户, 原账户余额无法提取, 由现下打款给客户"; // 描述 + String wechatAppId = wechatAppId1; // 万车充id adapayService.createBalancePaymentRequest(outMemberId, inMemberId, transAmt, title, desc, wechatAppId); } diff --git a/jsowell-admin/src/test/java/Test.java b/jsowell-admin/src/test/java/Test.java deleted file mode 100644 index ed1a484f1..000000000 --- a/jsowell-admin/src/test/java/Test.java +++ /dev/null @@ -1,85 +0,0 @@ -import com.google.common.collect.Lists; - -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; - -/** - * @program: com.cjml.service.partitem - * @description: - * @author: dc - * @create: 2023-09-09 13:46 - **/ -public class Test { - - public static void main(String[] args) { - List testList = Lists.newArrayList(); - testList.add(new Category("100-50-10", 100, 50, 10)); - testList.add(new Category("110-30-8", 110, 30, 8)); - testList.add(new Category("0-10-12", 0, 10, 12)); - - testList = testList.stream().sorted( - Comparator.comparing(Category::getCategoryGroupSort, Comparator.reverseOrder()) - .thenComparing(Category::getCategorySort, Comparator.reverseOrder()) - .thenComparing(Category::getPartSort) - ) - .collect(Collectors.toList()); - - System.out.println(testList.get(0).getCategoryName()); - System.out.println(testList.get(1).getCategoryName()); - System.out.println(testList.get(2).getCategoryName()); - } - - static class Category { - - private String categoryName; - - private int categoryGroupSort; - - private int categorySort; - - private int partSort; - - public Category() { - } - - public Category(String categoryName, int categoryGroupSort, int categorySort, int partSort) { - this.categoryName = categoryName; - this.categoryGroupSort = categoryGroupSort; - this.categorySort = categorySort; - this.partSort = partSort; - } - - public String getCategoryName() { - return categoryName; - } - - public void setCategoryName(String categoryName) { - this.categoryName = categoryName; - } - - public int getCategoryGroupSort() { - return categoryGroupSort; - } - - public void setCategoryGroupSort(int categoryGroupSort) { - this.categoryGroupSort = categoryGroupSort; - } - - public int getCategorySort() { - return categorySort; - } - - public void setCategorySort(int categorySort) { - this.categorySort = categorySort; - } - - public int getPartSort() { - return partSort; - } - - public void setPartSort(int partSort) { - this.partSort = partSort; - } - } -} diff --git a/jsowell-admin/src/test/java/rpc/ClientChannelInitializer.java b/jsowell-admin/src/test/java/rpc/ClientChannelInitializer.java new file mode 100644 index 000000000..d086c5383 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/ClientChannelInitializer.java @@ -0,0 +1,19 @@ +package rpc; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; + +public class ClientChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + + pipeline.addLast(new MessageEncode()); + pipeline.addLast(new MessageDecode()); + + pipeline.addLast(new RpcResponseHandler()); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/Message.java b/jsowell-admin/src/test/java/rpc/Message.java new file mode 100644 index 000000000..3323b79ea --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/Message.java @@ -0,0 +1,11 @@ +package rpc; + +import lombok.Data; + +@Data +public abstract class Message { + + protected Byte messageType; + + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/MessageConstant.java b/jsowell-admin/src/test/java/rpc/MessageConstant.java new file mode 100644 index 000000000..f3f78453b --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/MessageConstant.java @@ -0,0 +1,22 @@ +package rpc; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class MessageConstant { + + public final static Byte rpcRequest = 1; + public final static Byte rpcResponse = 2; + + public static Map> messageTypeMap = new ConcurrentHashMap<>(); + + static { + messageTypeMap.put(rpcRequest, RpcRequest.class); + messageTypeMap.put(rpcResponse, RpcResponse.class); + } + + public static Class extends Message> getMessageClass(Byte messageType){ + return messageTypeMap.get(messageType); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/MessageDecode.java b/jsowell-admin/src/test/java/rpc/MessageDecode.java new file mode 100644 index 000000000..083f43166 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/MessageDecode.java @@ -0,0 +1,45 @@ +package rpc; + +import com.jsowell.common.util.bean.SerializationUtil; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class MessageDecode extends ByteToMessageDecoder { + + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + + // 由于数据包的前4个字节用于记录总数据大小,如果数据不够4个字节,不进行读 + if(byteBuf.readableBytes() < 4) { + return; + } + + // 标记开始读的位置 + byteBuf.markReaderIndex(); + + // 前四个字节记录了数据大小 + int dataSize = byteBuf.readInt(); + + // 查看剩余可读字节是否足够,如果不是,重置读取位置,等待下一次解析 + if(byteBuf.readableBytes() < dataSize) { + byteBuf.resetReaderIndex(); + return; + } + + // 读取消息类型 + byte messageType = byteBuf.readByte(); + // 读取数据, 数组大小需要剔除1个字节的消息类型 + byte[] data = new byte[dataSize -1]; + + byteBuf.readBytes(data); + + Message message = SerializationUtil.deserialize(MessageConstant.getMessageClass(messageType), data); + + list.add(message); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/MessageEncode.java b/jsowell-admin/src/test/java/rpc/MessageEncode.java new file mode 100644 index 000000000..338a74ac3 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/MessageEncode.java @@ -0,0 +1,23 @@ +package rpc; + +import com.jsowell.common.util.bean.SerializationUtil; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +public class MessageEncode extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception { + // 将对象进行序列化 + byte[] data = SerializationUtil.serialize(message); + + // 写数据长度,前4个字节用于记录数据总长度(对象 + 类型(1个字节)) + byteBuf.writeInt(data.length + 1); + // 写记录消息类型,用于反序列选择类的类型 + byteBuf.writeByte(message.getMessageType()); + // 写对象 + byteBuf.writeBytes(data); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcClient.java b/jsowell-admin/src/test/java/rpc/RpcClient.java new file mode 100644 index 000000000..92b615f73 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcClient.java @@ -0,0 +1,52 @@ +package rpc; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + +public class RpcClient { + + public Channel connect(String host, Integer port) { + EventLoopGroup worker = new NioEventLoopGroup(); + Channel channel = null; + + try { + Bootstrap bootstrap = new Bootstrap(); + + bootstrap.group(worker) + .channel(NioSocketChannel.class) + .option(ChannelOption.AUTO_READ, true) + .handler(new ClientChannelInitializer()); + + ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); + + System.out.println("客户端启动"); + + channel = channelFuture.channel(); + + // 添加关闭监听器 + channel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + System.out.println("关闭客户端"); + worker.shutdownGracefully(); + } + }); + + } catch (Exception e) { + e.printStackTrace(); + + if(channel == null || !channel.isActive()) { + worker.shutdownGracefully(); + } else { + channel.close(); + } + + } + + return channel; + } + + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcRequest.java b/jsowell-admin/src/test/java/rpc/RpcRequest.java new file mode 100644 index 000000000..f55e85b71 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcRequest.java @@ -0,0 +1,21 @@ +package rpc; + +import lombok.Data; +import lombok.ToString; + +import java.util.UUID; + +@Data +@ToString +public class RpcRequest extends Message{ + + private String id; + + private String param; + + public RpcRequest() { + this.id = UUID.randomUUID().toString(); + super.messageType = MessageConstant.rpcRequest; + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcRequestHandler.java b/jsowell-admin/src/test/java/rpc/RpcRequestHandler.java new file mode 100644 index 000000000..74b432bca --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcRequestHandler.java @@ -0,0 +1,36 @@ +package rpc; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; + +public class RpcRequestHandler extends SimpleChannelInboundHandler { + + private final static EventLoopGroup worker = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1); + + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception { + + // 为避免占用网络io,此处异步进行处理 + worker.submit(() -> { + System.out.println("[RpcRequestHandler] "+ Thread.currentThread().getName() +" 处理请求,msg: " + msg); + + // 模拟处理耗时 + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + RpcResponse rpcResponse = new RpcResponse(); + rpcResponse.setId(msg.getId()); + rpcResponse.setResult("处理" + msg.getParam()); + + ctx.writeAndFlush(rpcResponse); + }); + + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcResponse.java b/jsowell-admin/src/test/java/rpc/RpcResponse.java new file mode 100644 index 000000000..43e5a7ad8 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcResponse.java @@ -0,0 +1,18 @@ +package rpc; + +import lombok.Data; +import lombok.ToString; + +@Data +@ToString +public class RpcResponse extends Message{ + + private String id; + + private String result; + + public RpcResponse() { + super.messageType = MessageConstant.rpcResponse; + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcResponseHandler.java b/jsowell-admin/src/test/java/rpc/RpcResponseHandler.java new file mode 100644 index 000000000..085c5bf31 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcResponseHandler.java @@ -0,0 +1,23 @@ +package rpc; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +public class RpcResponseHandler extends SimpleChannelInboundHandler { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception { + // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msg.getId()); + + if(syncPromise != null) { + // 设置响应结果 + syncPromise.setRpcResponse(msg); + + // 唤醒外部线程 + syncPromise.wake(); + } + + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcServer.java b/jsowell-admin/src/test/java/rpc/RpcServer.java new file mode 100644 index 000000000..a1b55da49 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcServer.java @@ -0,0 +1,55 @@ +package rpc; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +public class RpcServer { + + public void bind(Integer port) { + + EventLoopGroup parent = new NioEventLoopGroup(); + EventLoopGroup child = new NioEventLoopGroup(); + Channel channel = null; + + try{ + ServerBootstrap serverBootstrap = new ServerBootstrap(); + + serverBootstrap.group(parent, child) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024) + .childHandler(new ServerChannelInitializer()); + + ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); + + System.out.println("server启动"); + + // 非阻塞等待关闭 + channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + System.out.println("server关闭"); + parent.shutdownGracefully(); + child.shutdownGracefully(); + } + }); + + channel = channelFuture.channel(); + + } catch (Exception e) { + + e.printStackTrace(); + + if(channel == null || !channel.isActive()) { + System.out.println("server关闭"); + parent.shutdownGracefully(); + child.shutdownGracefully(); + } else { + channel.close(); + } + } + } + + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcUtil.java b/jsowell-admin/src/test/java/rpc/RpcUtil.java new file mode 100644 index 000000000..5d8c4bf89 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcUtil.java @@ -0,0 +1,63 @@ +package rpc; + +import io.netty.channel.Channel; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class RpcUtil { + + private final static Map syncPromiseMap = new ConcurrentHashMap<>(); + + private final static Channel channel; + + static{ + channel = new RpcClient().connect("127.0.0.1", 8888); + } + + public static RpcResponse send(RpcRequest rpcRequest, long timeout, TimeUnit unit) throws Exception{ + + if(channel == null) { + throw new NullPointerException("channel"); + } + + if(rpcRequest == null) { + throw new NullPointerException("rpcRequest"); + } + + if(timeout <= 0) { + throw new IllegalArgumentException("timeout must greater than 0"); + } + + // 创造一个容器,用于存放当前线程与rpcClient中的线程交互 + SyncPromise syncPromise = new SyncPromise(); + syncPromiseMap.put(rpcRequest.getId(), syncPromise); + + // 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果 + channel.writeAndFlush(rpcRequest); + + // 等待获取结果 + RpcResponse rpcResponse = syncPromise.get(timeout, unit); + + if(rpcResponse == null) { + if(syncPromise.isTimeout()) { + throw new TimeoutException("等待响应结果超时"); + } else{ + throw new Exception("其他异常"); + } + } + + // 移除容器 + syncPromiseMap.remove(rpcRequest.getId()); + + return rpcResponse; + } + + public static Map getSyncPromiseMap(){ + return syncPromiseMap; + } + + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/ServerChannelInitializer.java b/jsowell-admin/src/test/java/rpc/ServerChannelInitializer.java new file mode 100644 index 000000000..283d00b5d --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/ServerChannelInitializer.java @@ -0,0 +1,19 @@ +package rpc; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; + +public class ServerChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + + pipeline.addLast(new MessageEncode()); + pipeline.addLast(new MessageDecode()); + + pipeline.addLast(new RpcRequestHandler()); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/SyncPromise.java b/jsowell-admin/src/test/java/rpc/SyncPromise.java new file mode 100644 index 000000000..4bcdb13fa --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/SyncPromise.java @@ -0,0 +1,49 @@ +package rpc; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class SyncPromise { + + // 用于接收结果 + private RpcResponse rpcResponse; + + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + // 用于判断是否超时 + private boolean isTimeout = false; + + /** + * 同步等待返回结果 + */ + public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException { + // 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断 + boolean earlyWakeUp = countDownLatch.await(timeout, unit); + + if(earlyWakeUp) { + // 超时时间内countDownLatch减到0,提前唤醒,说明已有结果 + return rpcResponse; + } else { + // 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果 + isTimeout = true; + return null; + } + } + + public void wake() { + countDownLatch.countDown(); + } + + public RpcResponse getRpcResponse() { + return rpcResponse; + } + + public void setRpcResponse(RpcResponse rpcResponse) { + this.rpcResponse = rpcResponse; + } + + public boolean isTimeout() { + return isTimeout; + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/TestRpcClient.java b/jsowell-admin/src/test/java/rpc/TestRpcClient.java new file mode 100644 index 000000000..aeba06cbb --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/TestRpcClient.java @@ -0,0 +1,48 @@ +package rpc; + +import java.util.concurrent.TimeUnit; + +public class TestRpcClient { + public static void main(String[] args) throws Exception{ + //Channel channel = new RpcClient().connect("127.0.0.1", 8888); + + Thread thread1 = new Thread(new Runnable() { + @Override + public void run() { + RpcRequest rpcRequest = new RpcRequest(); + rpcRequest.setParam("参数1"); + + try { + System.out.println("thread1发送请求"); + RpcResponse rpcResponse = RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS); + System.out.println("thread1处理结果:" + rpcResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + Thread thread2 = new Thread(new Runnable() { + @Override + public void run() { + RpcRequest rpcRequest2 = new RpcRequest(); + rpcRequest2.setParam("参数2"); + + try { + System.out.println("thread2发送请求"); + RpcResponse rpcResponse = RpcUtil.send(rpcRequest2, 2, TimeUnit.SECONDS); + System.out.println("thread2处理结果:" + rpcResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + // 休眠一下,等待客户端与服务端进行连接 + Thread.sleep(1000); + + thread1.start(); + thread2.start(); + + } +} diff --git a/jsowell-admin/src/test/java/rpc/TestRpcServer.java b/jsowell-admin/src/test/java/rpc/TestRpcServer.java new file mode 100644 index 000000000..c15d9e211 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/TestRpcServer.java @@ -0,0 +1,7 @@ +package rpc; + +public class TestRpcServer { + public static void main(String[] args) { + new RpcServer().bind(8888); + } +} diff --git a/jsowell-common/pom.xml b/jsowell-common/pom.xml index 00aa07bd2..4be94b3de 100644 --- a/jsowell-common/pom.xml +++ b/jsowell-common/pom.xml @@ -213,6 +213,15 @@ 1.77 + + io.protostuff + protostuff-core + + + + io.protostuff + protostuff-runtime + diff --git a/jsowell-common/src/main/java/com/jsowell/common/core/domain/ykc/YKCFrameTypeCode.java b/jsowell-common/src/main/java/com/jsowell/common/core/domain/ykc/YKCFrameTypeCode.java index 81349b3fc..b9e2e1920 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/core/domain/ykc/YKCFrameTypeCode.java +++ b/jsowell-common/src/main/java/com/jsowell/common/core/domain/ykc/YKCFrameTypeCode.java @@ -1,13 +1,17 @@ package com.jsowell.common.core.domain.ykc; +import com.google.common.collect.Lists; import com.jsowell.common.util.BytesUtil; import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.YKCUtils; +import java.util.List; + /** * 云快充 帧类型码 * FrameTypeCode - * frame + * 单数为 桩 -> 平台 + * 双数为 平台 -> 桩 */ public enum YKCFrameTypeCode { @@ -39,12 +43,11 @@ public enum YKCFrameTypeCode { REQUEST_START_CHARGING_CODE(0x31, "充电桩主动申请启动充电"), CONFIRM_START_CHARGING_CODE(0x32, "运营平台确认启动充电"), - REMOTE_START_CHARGING_ANSWER_CODE(0x33, "远程启动充电命令回复"), - REMOTE_CONTROL_START_CODE(0x34, "运营平台远程控制启机(启动充电)"), - - REMOTE_STOP_CHARGING_ANSWER_CODE(0x35, "远程停机命令回复"), - REMOTE_STOP_CHARGING_CODE(0x36, "运营平台远程停机(停止充电)"), + REMOTE_CONTROL_START_CHARGING_ANSWER_CODE(0x33, "远程启动充电命令回复"), + REMOTE_CONTROL_START_CHARGING_CODE(0x34, "运营平台远程控制启机(启动充电)"), + REMOTE_CONTROL_STOP_CHARGING_ANSWER_CODE(0x35, "远程停机命令回复"), + REMOTE_CONTROL_STOP_CHARGING_CODE(0x36, "运营平台远程停机(停止充电)"), TRANSACTION_RECORDS_CODE(0x3B, "交易记录"), TRANSACTION_RECORDS_OLD_VERSION_CODE(0x39, "交易记录V1.3"), @@ -71,15 +74,16 @@ public enum YKCFrameTypeCode { BILLING_TEMPLATE_SETTING_CODE(0x58, "计费模型设置"), BILLING_TEMPLATE_SETTING_ANSWER_CODE(0x57, "计费模型设置应答"), - RESERVATION_CHARGING_CODE(0x60, "预约充电设置"), - RESERVATION_CHARGING_ANSWER_CODE(0x59, "预约充电设置响应"), // RESERVATION + RESERVATION_CHARGING_SETUP_CODE(0x60, "预约充电设置"), + RESERVATION_CHARGING_SETUP_ANSWER_CODE(0x59, "预约充电设置响应"), // RESERVATION RESERVATION_CHARGING_STARTUP_RESULT_ANSWER_CODE(0x64, "预约充电启动结果上传响应"), // 平台响应 RESERVATION_CHARGING_STARTUP_RESULT_CODE(0x65, "预约充电启动结果上传"), // 桩 -> 平台 GROUND_LOCK_DATA_UPLOAD_CODE(0x61, "地锁数据上送"), - REMOTE_CONTROL_GROUND_LOCK_LIFTING_CODE(0x62, "遥控地锁升降"), - CHARGING_PILE_RESPOND_GROUND_LOCK_LIFTING_CODE(0X63, "充电桩响应地锁升降数据"), + + REMOTE_CONTROL_GROUND_LOCK_CODE(0x62, "遥控地锁升降"), + REMOTE_CONTROL_GROUND_LOCK_ANSWER_CODE(0X63, "充电桩响应地锁升降数据"), REMOTE_RESTART_CODE(0x92, "远程重启"), REMOTE_RESTART_ANSWER_CODE(0x91, "远程重启应答"), @@ -125,11 +129,6 @@ public enum YKCFrameTypeCode { return BytesUtil.intToBytesLittle(code, 1); } - public static void main(String[] args) { - byte[] bytes = BytesUtil.intToBytesLittle(9999, 1); - System.out.println(YKCUtils.frameType2Str(bytes)); - } - public static YKCFrameTypeCode fromCode(byte code) { for (YKCFrameTypeCode item : YKCFrameTypeCode.values()) { if (item.getCode() == code) { @@ -150,31 +149,32 @@ public enum YKCFrameTypeCode { } /** - * 请求应答 帧类型关系 + * 桩请求 - 平台应答 帧类型关系 + * PlatformAnswersRelation */ - public enum ResponseRelation { + public enum PlatformAnswersRelation { // 登录 LOGIN(LOGIN_CODE.getCode(), LOGIN_ANSWER_CODE.getCode()), + // 心跳 HEART_BEAT(HEART_BEAT_CODE.getCode(), HEART_BEAT_ANSWER_CODE.getCode()), + // 计费模板验证 BILLING_TEMPLATE_VALIDATE(BILLING_TEMPLATE_VALIDATE_CODE.getCode(), BILLING_TEMPLATE_VALIDATE_ANSWER_CODE.getCode()), + // 计费模板请求 BILLING_TEMPLATE(BILLING_TEMPLATE_CODE.getCode(), BILLING_TEMPLATE_ANSWER_CODE.getCode()), - // 请求开始充电 + + // 充电桩请求开始充电 START_CHARGING(REQUEST_START_CHARGING_CODE.getCode(), CONFIRM_START_CHARGING_CODE.getCode()), - // 远程请求充电 - REMOTE_START_CHARGING(REMOTE_CONTROL_START_CODE.getCode(), REMOTE_START_CHARGING_ANSWER_CODE.getCode()), - // 远程停止充电 - REMOTE_STOP_CHARGING(REMOTE_STOP_CHARGING_CODE.getCode(), REMOTE_STOP_CHARGING_ANSWER_CODE.getCode()), + // 交易记录 TRANSACTION_RECORDS(TRANSACTION_RECORDS_CODE.getCode(), TRANSACTION_RECORDS_CONFIRM_CODE.getCode()), - // 远程账户更新 - REMOTE_ACCOUNT_BALANCE_UPDATE(REMOTE_ACCOUNT_BALANCE_UPDATE_CODE.getCode(), REMOTE_ACCOUNT_BALANCE_UPDATE_ANSWER_CODE.getCode()), + TRANSACTION_RECORDS_V13(TRANSACTION_RECORDS_OLD_VERSION_CODE.getCode(), TRANSACTION_RECORDS_CONFIRM_CODE.getCode()), + // 预约充电启动结果 RESERVATION_CHARGING_STARTUP_RESULT(RESERVATION_CHARGING_STARTUP_RESULT_CODE.getCode(), RESERVATION_CHARGING_STARTUP_RESULT_ANSWER_CODE.getCode()), - ; // 请求帧类型 private int requestFrameType; @@ -198,16 +198,16 @@ public enum YKCFrameTypeCode { this.responseFrameType = responseFrameType; } - ResponseRelation(int requestFrameType, int responseFrameType) { + PlatformAnswersRelation(int requestFrameType, int responseFrameType) { this.requestFrameType = requestFrameType; this.responseFrameType = responseFrameType; } // 根据请求帧类型 获取应答帧类型 int类型 public static int getResponseFrameTypeByRequestFrameType(int requestFrameType) { - for (ResponseRelation responseRelation : ResponseRelation.values()) { - if (responseRelation.getRequestFrameType() == requestFrameType) { - return responseRelation.getResponseFrameType(); + for (PlatformAnswersRelation relation : PlatformAnswersRelation.values()) { + if (relation.getRequestFrameType() == requestFrameType) { + return relation.getResponseFrameType(); } } return 0; @@ -221,4 +221,122 @@ public enum YKCFrameTypeCode { } + /** + * 平台请求 - 桩应答 帧类型关系 + * PileAnswersRelation + */ + public enum PileAnswersRelation { + // 远程请求充电 + REMOTE_START_CHARGING(REMOTE_CONTROL_START_CHARGING_CODE.getCode(), REMOTE_CONTROL_START_CHARGING_ANSWER_CODE.getCode()), + + // 远程停止充电 + REMOTE_STOP_CHARGING(REMOTE_CONTROL_STOP_CHARGING_CODE.getCode(), REMOTE_CONTROL_STOP_CHARGING_ANSWER_CODE.getCode()), + + // 账户余额更新 + ACCOUNT_BALANCE_UPDATE(REMOTE_ACCOUNT_BALANCE_UPDATE_CODE.getCode(), REMOTE_ACCOUNT_BALANCE_UPDATE_ANSWER_CODE.getCode()), + + // 离线卡数据同步 + OFFLINE_CARD_DATA_SYNCHRONIZATION(OFFLINE_CARD_DATA_SYNCHRONIZATION_CODE.getCode(), OFFLINE_CARD_DATA_SYNCHRONIZATION_ANSWER_CODE.getCode()), + + // 离线卡数据清除 + OFFLINE_CARD_DATA_CLEANING(OFFLINE_CARD_DATA_CLEANING_CODE.getCode(), OFFLINE_CARD_DATA_CLEANING_ANSWER_CODE.getCode()), + + // 离线卡数据查询 + OFFLINE_CARD_DATA_QUERY(OFFLINE_CARD_DATA_QUERY_CODE.getCode(), OFFLINE_CARD_DATA_QUERY_ANSWER_CODE.getCode()), + + // 充电桩工作参数设置 + CHARGING_PILE_WORKING_PARAMETER_SETTING(CHARGING_PILE_WORKING_PARAMETER_SETTING_CODE.getCode(), CHARGING_PILE_WORKING_PARAMETER_SETTING_ANSWER_CODE.getCode()), + + // 对时设置 + TIME_CHECK_SETTING(TIME_CHECK_SETTING_CODE.getCode(), TIME_CHECK_SETTING_ANSWER_CODE.getCode()), + + // 计费模型设置 + BILLING_TEMPLATE_SETTING(BILLING_TEMPLATE_SETTING_CODE.getCode(), BILLING_TEMPLATE_SETTING_ANSWER_CODE.getCode()), + + // 预约充电设置 + RESERVATION_CHARGING_SETUP(RESERVATION_CHARGING_SETUP_CODE.getCode(), RESERVATION_CHARGING_SETUP_ANSWER_CODE.getCode()), + + // 预约充电启动结果上传 + RESERVATION_CHARGING_STARTUP_RESULT(RESERVATION_CHARGING_STARTUP_RESULT_CODE.getCode(), RESERVATION_CHARGING_STARTUP_RESULT_ANSWER_CODE.getCode()), + + // 遥控地锁 + CONTROL_GROUND_LOCK(REMOTE_CONTROL_GROUND_LOCK_CODE.getCode(), REMOTE_CONTROL_GROUND_LOCK_ANSWER_CODE.getCode()), + + // 远程重启 + REMOTE_RESTART(REMOTE_RESTART_CODE.getCode(), REMOTE_RESTART_ANSWER_CODE.getCode()), + + // 远程更新 + REMOTE_UPDATE(REMOTE_UPDATE_CODE.getCode(), REMOTE_UPDATE_ANSWER_CODE.getCode()), + + // 下发二维码 + REMOTE_ISSUE_QRCODE(REMOTE_ISSUE_QRCODE_CODE.getCode(), REMOTE_ISSUE_QRCODE_ANSWER_CODE.getCode()), + + // 查询工作参数 + QUERY_PILE_WORK_PARAMS(QUERY_PILE_WORK_PARAMS_CODE.getCode(), QUERY_PILE_WORK_PARAMS_ANSWER_CODE.getCode()), + + // 设置工作参数 + SETTING_PILE_WORK_PARAMS(SETTING_PILE_WORK_PARAMS_CODE.getCode(), SETTING_PILE_WORK_PARAMS_ANSWER_CODE.getCode()), + + // 远程账户余额更新 + REMOTE_ACCOUNT_BALANCE_UPDATE(REMOTE_ACCOUNT_BALANCE_UPDATE_CODE.getCode(), REMOTE_ACCOUNT_BALANCE_UPDATE_ANSWER_CODE.getCode()), + + ; + // 请求帧类型 + private int requestFrameType; + + // 响应帧类型 + private int responseFrameType; + + public int getRequestFrameType() { + return requestFrameType; + } + + public void setRequestFrameType(int requestFrameType) { + this.requestFrameType = requestFrameType; + } + + public int getResponseFrameType() { + return responseFrameType; + } + + public void setResponseFrameType(int responseFrameType) { + this.responseFrameType = responseFrameType; + } + + public byte[] getRequestFrameBytes() { + return BytesUtil.intToBytesLittle(requestFrameType, 1); + } + + PileAnswersRelation(int requestFrameType, int responseFrameType) { + this.requestFrameType = requestFrameType; + this.responseFrameType = responseFrameType; + } + + // 根据请求帧类型 获取应答帧类型 int类型 + public static int getResponseFrameTypeByRequestFrameType(int requestFrameType) { + for (PileAnswersRelation relation : PileAnswersRelation.values()) { + if (relation.getRequestFrameType() == requestFrameType) { + return relation.getResponseFrameType(); + } + } + return 0; + } + + // 根据请求帧类型 获取应答帧类型 byte[]类型 + public static byte[] getResponseFrameTypeBytes(byte[] requestFrameType) { + int frameType = BytesUtil.bytesToInt(requestFrameType); + return BytesUtil.intToBytes(getResponseFrameTypeByRequestFrameType(frameType), 1); + } + + // 需要获取应答的帧类型 + public static List getRequestFrameTypeList() { + List resultList = Lists.newArrayList(); + for (PileAnswersRelation relation : PileAnswersRelation.values()) { + resultList.add(YKCUtils.frameType2Str(relation.getRequestFrameBytes())); + } + return resultList; + } + + } + } diff --git a/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java b/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java index 76d6f303d..e0da8e82b 100644 --- a/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java +++ b/jsowell-common/src/main/java/com/jsowell/common/util/YKCUtils.java @@ -20,6 +20,7 @@ public class YKCUtils { * @return */ public static boolean checkMsg(byte[] msg) { + // log.info("checkMsg:{}", BytesUtil.binary(msg, 16)); // 起始标志 byte[] head = BytesUtil.copyBytes(msg, 0, 1); // 数据长度 @@ -44,7 +45,7 @@ public class YKCUtils { byte[] data = Bytes.concat(serialNumber, encryptFlag, frameType, msgBody); // 校验长度 if (data.length != BytesUtil.bytesToIntLittle(length)) { - log.error("数据长度不正确"); + log.error("数据长度不正确, 数据长度:{}, 实际长度:{}", BytesUtil.bytesToIntLittle(length), data.length); return false; } // CRC校验 source target @@ -66,6 +67,11 @@ public class YKCUtils { return false; } + public static void main(String[] args) { + byte[] length = new byte[]{0x22}; + System.out.println(BytesUtil.bytesToIntLittle(length)); + } + /** * 转换电压电流以及起始soc * 精确到小数点后一位;待机置零 @@ -127,46 +133,6 @@ public class YKCUtils { return bytes; } - public static void main(String[] args) { - String frameTypeStr = "0x01"; - byte[] bytes = frameTypeStr2Bytes(frameTypeStr); - System.out.println("转为byte数组:" + Arrays.toString(bytes)); - String frameType2Str = frameType2Str(bytes); - System.out.println("转为Str:" + frameType2Str); - - // String hexString = "681E0000003388000000000027012302081602434533880000000000270101008361"; - // byte[] byteArray = new byte[hexString.length() / 2]; - // for (int i = 0; i < byteArray.length; i++) { - // int index = i * 2; - // int j = Integer.parseInt(hexString.substring(index, index + 2), 16); - // byteArray[i] = (byte) j; - // } - // System.out.println(byteArray); - // String binary = BytesUtil.binary(byteArray, 16); - // String aaa = DatatypeConverter.printHexBinary(byteArray); - // System.out.println(binary); - // System.out.println(aaa); - - // String targetCRC = "0abb"; - // String substring = StringUtils.substring(targetCRC, 0, 2); - // String substring1 = StringUtils.substring(targetCRC, 2, 4); - // String crc = substring1 + substring; - // - // String hexString = "4f"; - // byte[] bytes = new byte[]{0x4f}; - // - // String s = transitionTemperature(bytes); - // System.out.println(s); - // - // byte[] bytess = new byte[]{(byte) 0x80, (byte) 0x1A, 0x06, 0x00}; - // String s1 = convertDecimalPoint(bytess, 5); - // System.out.println(s1); - // - // String amount = "1000"; - // byte[] priceByte = getPriceByte(amount, 2); - // System.out.println(BytesUtil.bin2HexStr(priceByte)); - } - /** * 转换温度 * BIN 码 1 整形,偏移量-50;待机置零 diff --git a/jsowell-common/src/main/java/com/jsowell/common/util/bean/SerializationUtil.java b/jsowell-common/src/main/java/com/jsowell/common/util/bean/SerializationUtil.java new file mode 100644 index 000000000..3906245d8 --- /dev/null +++ b/jsowell-common/src/main/java/com/jsowell/common/util/bean/SerializationUtil.java @@ -0,0 +1,53 @@ +package com.jsowell.common.util.bean; + +import io.protostuff.LinkedBuffer; +import io.protostuff.ProtostuffIOUtil; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SerializationUtil { + + private final static Map, Schema>> schemaCache = new ConcurrentHashMap<>(); + + /** + * 序列化 + */ + public static byte[] serialize(T object){ + LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); + + try { + Class cls = (Class) object.getClass(); + Schema schema = getSchema(cls); + + return ProtostuffIOUtil.toByteArray(object, schema, buffer); + } catch (Exception e) { + throw e; + } finally { + buffer.clear(); + } + } + + /** + * 反序列化 + */ + public static T deserialize(Class cls, byte[] data) { + Schema schema = getSchema(cls); + T message = schema.newMessage(); + ProtostuffIOUtil.mergeFrom(data, message, schema); + return message; + } + + public static Schema getSchema(Class cls) { + Schema schema = (Schema) schemaCache.get(cls); + + if(schema == null) { + schema = RuntimeSchema.getSchema(cls); + schemaCache.put(cls, schema); + } + return schema; + } + +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java new file mode 100644 index 000000000..f46b481c0 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ChargingPileDecoder.java @@ -0,0 +1,88 @@ +package com.jsowell.netty.decoder; + +import com.jsowell.netty.domain.ChargingPileMessage; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class ChargingPileDecoder extends ByteToMessageDecoder { + + private static final byte[] FRAME_HEADER = {'D', 'N', 'Y'}; // 包头为"DNY" + private static final int MIN_FRAME_LENGTH = 14; // 最小帧长度:包头(3)+长度(2)+物理ID(4)+消息ID(2)+命令(1)+校验(2) + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + while (in.readableBytes() >= MIN_FRAME_LENGTH) { + in.markReaderIndex(); + + // 检查包头 + byte[] header = new byte[3]; + in.readBytes(header); + if (!isValidHeader(header)) { + in.resetReaderIndex(); + return; + } + + // 读取长度 + short length = in.readShort(); + if (in.readableBytes() < length - 5) { // 5 = 包头(3) + 长度(2) + in.resetReaderIndex(); + return; + } + + // 读取物理ID + int physicalId = in.readInt(); + log.info("physicalId:{}", physicalId); + + // 读取消息ID + short messageId = in.readShort(); + log.info("messageId:{}", messageId); + + // 读取命令 + byte command = in.readByte(); + log.info("command:{}", command); + + // 读取数据 + int dataLength = length - 13; // 13 = 包头(3) + 长度(2) + 物理ID(4) + 消息ID(2) + 命令(1) + 校验(2) + byte[] data = new byte[dataLength]; + in.readBytes(data); + log.info("data:{}", data); + + // 读取校验和 + short checksum = in.readShort(); + log.info("checksum:{}", checksum); + + // 验证校验和 + short calculatedChecksum = calculateChecksum(in, length); + log.info("calculatedChecksum:{}", calculatedChecksum); + if (checksum != calculatedChecksum) { + log.info("校验和错误,丢弃此帧"); + continue; + } + + // 创建消息对象并添加到输出列表 + ChargingPileMessage message = new ChargingPileMessage(physicalId, messageId, command, data); + log.info("ChargingPileMessage:{}", message.toString()); + out.add(message); + } + } + + private boolean isValidHeader(byte[] header) { + log.info("isValidHeader header:{}", header); + return header[0] == FRAME_HEADER[0] && header[1] == FRAME_HEADER[1] && header[2] == FRAME_HEADER[2]; + } + + private short calculateChecksum(ByteBuf buf, int length) { + log.info("calculateChecksum ByteBuf:{}, length:{}", buf.toString(), length); + short sum = 0; + for (int i = 0; i < length - 2; i++) { + sum += buf.getByte(buf.readerIndex() - length + i) & 0xFF; + } + return sum; + } +} + diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ProtocolDnyDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ProtocolDnyDecoder.java new file mode 100644 index 000000000..0c627aab7 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/ProtocolDnyDecoder.java @@ -0,0 +1,117 @@ +package com.jsowell.netty.decoder; + + +import com.jsowell.netty.domain.ProtocolDnyMessage; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.ByteOrder; +import java.util.List; + +@Slf4j +public class ProtocolDnyDecoder extends ByteToMessageDecoder { + private static final int HEADER_LENGTH = 3; // 包头长度 + private static final int LENGTH_FIELD_LENGTH = 2; // 长度字段长度 + private static final int PHYSICAL_ID_LENGTH = 4; // 物理ID长度 + private static final int MESSAGE_ID_LENGTH = 2; // 消息ID长度 + private static final int COMMAND_LENGTH = 1; // 命令长度 + private static final int CHECKSUM_LENGTH = 2; // 校验字段长度 + + private static final int MIN_FRAME_LENGTH = HEADER_LENGTH + LENGTH_FIELD_LENGTH + PHYSICAL_ID_LENGTH + MESSAGE_ID_LENGTH + COMMAND_LENGTH + CHECKSUM_LENGTH; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + // 检查是否有足够的数据来读取最小帧长度 + if (in.readableBytes() < MIN_FRAME_LENGTH) { + return; + } + + // 标记当前读位置 + in.markReaderIndex(); + + // 检查包头 + byte[] header = new byte[HEADER_LENGTH]; + in.readBytes(header); + if (header[0] != 'D' || header[1] != 'N' || header[2] != 'Y') { + in.resetReaderIndex(); + throw new IllegalStateException("Invalid start bytes: " + new String(header)); + } + + // 读取长度字段 + in.order(ByteOrder.LITTLE_ENDIAN); // 确保使用小端模式 + int length = in.readUnsignedShort(); + if (in.readableBytes() < length) { + in.resetReaderIndex(); + return; + } + + // 读取物理ID + byte[] physicalId = new byte[PHYSICAL_ID_LENGTH]; + in.readBytes(physicalId); + + // 读取消息ID + int messageId = in.readUnsignedShort(); + + // 读取命令 + byte command = in.readByte(); + + // 读取数据 + int dataLength = length - (PHYSICAL_ID_LENGTH + MESSAGE_ID_LENGTH + COMMAND_LENGTH + CHECKSUM_LENGTH); + byte[] data = new byte[dataLength]; + in.readBytes(data); + + // 读取校验字段 + int checksum = in.readUnsignedShort(); + + // 计算校验值并验证 + if (!verifyChecksum(header, length, physicalId, messageId, command, data, checksum)) { + in.resetReaderIndex(); + throw new IllegalStateException("Invalid checksum"); + } + + // 创建协议消息对象 + ProtocolDnyMessage message = new ProtocolDnyMessage(header, length, physicalId, messageId, command, data, checksum); + + // 将解码后的消息添加到输出列表中 + out.add(message); + } + + // 校验帧校验域的函数 + private boolean verifyChecksum(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data, int checksum) { + // 这里需要实现校验逻辑,假设我们有一个累加和校验函数sumCheck + int calculatedChecksum = sumCheck(header, length, physicalId, messageId, command, data); + return calculatedChecksum == checksum; + } + + // 累加和校验函数,计算从包头到数据的累加和 + private int sumCheck(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data) { + int sum = 0; + + for (byte b : header) { + sum += b & 0xFF; + } + + sum += length & 0xFF; + sum += (length >> 8) & 0xFF; + + for (byte b : physicalId) { + sum += b & 0xFF; + } + + sum += messageId & 0xFF; + sum += (messageId >> 8) & 0xFF; + + sum += command & 0xFF; + + for (byte b : data) { + sum += b & 0xFF; + } + + return sum & 0xFFFF; // 返回16位结果 + } +} + + + diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java index b32b9b6ff..69b486f94 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder.java @@ -5,77 +5,127 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; +import java.nio.charset.StandardCharsets; import java.util.List; @Slf4j public class StartAndLengthFieldFrameDecoder extends ByteToMessageDecoder { - // 起始标志 - private int HEAD_DATA; + private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 + private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 - public StartAndLengthFieldFrameDecoder(int HEAD_DATA) { - this.HEAD_DATA = HEAD_DATA; - } + // 构造函数,初始化起始标志 + public StartAndLengthFieldFrameDecoder() {} - /** - * - * 协议开始的标准head_data,int类型,占据1个字节. - * 表示数据的长度contentLength,int类型,占据1个字节. - * - */ - public final int BASE_LENGTH = 1 + 1; - - @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { - // 可读长度必须大于基本长度 - if (buffer.readableBytes() <= BASE_LENGTH) { - log.warn("可读字节数:{}小于基础长度:{}", buffer.readableBytes(), BASE_LENGTH); - return; - } - // 记录包头开始的index int beginReader; + // 循环查找包头 while (true) { + if (buffer.readableBytes() < Math.min(HEADER_LENGTH_DNY, HEADER_LENGTH_68)) { + return; // 数据长度不足,等待更多数据 + } + // 获取包头开始的index beginReader = buffer.readerIndex(); - // log.info("包头开始的index:{}", beginReader); - // 标记包头开始的index buffer.markReaderIndex(); - // 读到了协议的开始标志,结束while循环 - if (buffer.getUnsignedByte(beginReader) == HEAD_DATA) { - // log.info("读到了协议的开始标志,结束while循环 byte:{}, HEAD_DATA:{}", buffer.getUnsignedByte(beginReader), HEAD_DATA); - break; + + // 判断是否为DNY包头或68包头 + if (isStartOfDnyHeader(buffer, beginReader) || isStartOf68Header(buffer, beginReader)) { + break; // 读到了协议的开始标志,结束while循环 } // 未读到包头,略过一个字节 - // 每次略过,一个字节,去读取,包头信息的开始标记 buffer.resetReaderIndex(); buffer.readByte(); + } - // 当略过,一个字节之后, - // 数据包的长度,又变得不满足 - // 此时,应该结束。等待后面的数据到达 - if (buffer.readableBytes() < BASE_LENGTH) { - log.debug("数据包的长度不满足 readableBytes:{}, BASE_LENGTH:{}", buffer.readableBytes(), BASE_LENGTH); + // 检查包头是否是 "DNY" + if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + String header = new String(headerBytes, StandardCharsets.UTF_8); + + if ("DNY".equals(header)) { + // 处理 DNY 协议 + decodeDnyMessage(buffer, out, beginReader); return; } } - // 消息的长度 - int length = buffer.getUnsignedByte(beginReader + 1); - // 判断请求数据包数据是否到齐 - if (buffer.readableBytes() < length + 4) { - // log.info("请求数据包数据没有到齐,还原读指针 readableBytes:{}, 消息的长度:{}", buffer.readableBytes(), length); - // 还原读指针 + // 检查包头是否是 68 协议 + if (buffer.readableBytes() >= HEADER_LENGTH_68) { + if (buffer.getUnsignedByte(beginReader) == 0x68) { + // 处理 68 协议 + decode68Message(buffer, out, beginReader); + return; + } + } + + // 未知协议,还原读指针 + buffer.resetReaderIndex(); + } + + // 判断是否为DNY包头 + private boolean isStartOfDnyHeader(ByteBuf buffer, int beginReader) { + if (buffer.readableBytes() >= HEADER_LENGTH_DNY) { + byte[] headerBytes = new byte[HEADER_LENGTH_DNY]; + buffer.getBytes(beginReader, headerBytes, 0, HEADER_LENGTH_DNY); + String header = new String(headerBytes, StandardCharsets.UTF_8); + return "DNY".equals(header); + } + return false; + } + + // 判断是否为68包头 + private boolean isStartOf68Header(ByteBuf buffer, int beginReader) { + if (buffer.readableBytes() >= HEADER_LENGTH_68) { + return buffer.getUnsignedByte(beginReader) == 0x68; + } + return false; + } + + // 处理68协议消息 + private void decode68Message(ByteBuf buffer, List out, int beginReader) { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + 2) { buffer.readerIndex(beginReader); return; } - // 读取data数据 - byte[] data = new byte[length + 4]; - buffer.readBytes(data); - ByteBuf frame = buffer.retainedSlice(beginReader, length + 4); - buffer.readerIndex(beginReader + length + 4); + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_68); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length + 2) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 最后+2是帧校验域长度 + ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_68 + 1 + length + 2); + buffer.readerIndex(beginReader + HEADER_LENGTH_68 + 1 + length + 2); + out.add(frame); + } + + // 处理DNY协议消息 + private void decodeDnyMessage(ByteBuf buffer, List out, int beginReader) { + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1) { + buffer.readerIndex(beginReader); + return; + } + + // 获取消息长度 + int length = buffer.getUnsignedByte(beginReader + HEADER_LENGTH_DNY); + // 检查剩余数据是否足够 + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 1 + length) { + buffer.readerIndex(beginReader); + return; + } + + // 读取 data 数据 + ByteBuf frame = buffer.retainedSlice(beginReader, HEADER_LENGTH_DNY + 1 + length); + buffer.readerIndex(beginReader + HEADER_LENGTH_DNY + 1 + length); out.add(frame); } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java new file mode 100644 index 000000000..a6b0d5977 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/decoder/StartAndLengthFieldFrameDecoder2.java @@ -0,0 +1,117 @@ +package com.jsowell.netty.decoder; + +import com.jsowell.netty.domain.DnyMessage; +import com.jsowell.netty.domain.Message68; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class StartAndLengthFieldFrameDecoder2 extends ByteToMessageDecoder { + private static final int HEADER_LENGTH_DNY = 3; // "DNY" 包头的长度 + private static final int HEADER_LENGTH_68 = 1; // 68 包头的长度 + private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度,可以根据实际需求调整 + private static final byte[] DNY_HEADER = {'D', 'N', 'Y'}; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + while (buffer.readableBytes() > 0) { + int headerIndex = findHeaderIndex(buffer); + if (headerIndex == -1) { + // 没有找到有效的包头,丢弃所有数据 + buffer.skipBytes(buffer.readableBytes()); + return; + } + + buffer.readerIndex(headerIndex); + + if (isDnyHeader(buffer)) { + if (decodeDnyMessage(buffer, out)) { + return; + } + } else if (is68Header(buffer)) { + if (decode68Message(buffer, out)) { + return; + } + } else { + // 未知协议,跳过一个字节 + buffer.skipBytes(1); + } + } + } + + private int findHeaderIndex(ByteBuf buffer) { + int dnyIndex = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), DNY_HEADER[0]); + int index68 = buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), (byte) 0x68); + + if (dnyIndex == -1 && index68 == -1) { + return -1; + } else if (dnyIndex == -1) { + return index68; + } else if (index68 == -1) { + return dnyIndex; + } else { + return Math.min(dnyIndex, index68); + } + } + + private boolean isDnyHeader(ByteBuf buffer) { + return buffer.readableBytes() >= HEADER_LENGTH_DNY && + buffer.getByte(buffer.readerIndex()) == 'D' && + buffer.getByte(buffer.readerIndex() + 1) == 'N' && + buffer.getByte(buffer.readerIndex() + 2) == 'Y'; + } + + private boolean is68Header(ByteBuf buffer) { + return buffer.readableBytes() >= HEADER_LENGTH_68 && + buffer.getByte(buffer.readerIndex()) == 0x68; + } + + private boolean decodeDnyMessage(ByteBuf buffer, List out) { + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2) { + return false; // 数据不足,等待更多数据 + } + + int lengthFieldIndex = buffer.readerIndex() + HEADER_LENGTH_DNY; + int length = buffer.getUnsignedShort(lengthFieldIndex); + + if (length > MAX_FRAME_LENGTH) { + log.warn("DNY frame length exceeds maximum allowed: {}", length); + buffer.skipBytes(HEADER_LENGTH_DNY + 2); + return false; + } + + if (buffer.readableBytes() < HEADER_LENGTH_DNY + 2 + length) { + return false; // 数据不足,等待更多数据 + } + + ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_DNY + 2 + length); + out.add(new DnyMessage(frame)); + return true; + } + + private boolean decode68Message(ByteBuf buffer, List out) { + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1) { + return false; // 数据不足,等待更多数据 + } + + int length = buffer.getUnsignedByte(buffer.readerIndex() + HEADER_LENGTH_68); + + if (length > MAX_FRAME_LENGTH) { + log.warn("68 frame length exceeds maximum allowed: {}", length); + buffer.skipBytes(HEADER_LENGTH_68 + 1); + return false; + } + + if (buffer.readableBytes() < HEADER_LENGTH_68 + 1 + length) { + return false; // 数据不足,等待更多数据 + } + + ByteBuf frame = buffer.readRetainedSlice(HEADER_LENGTH_68 + 1 + length); + out.add(new Message68(frame)); + return true; + } +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java new file mode 100644 index 000000000..0a3f88a28 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ChargingPileMessage.java @@ -0,0 +1,35 @@ +package com.jsowell.netty.domain; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class ChargingPileMessage { + private int physicalId; + private short messageId; + private byte command; + private byte[] data; + + public ChargingPileMessage(int physicalId, short messageId, byte command, byte[] data) { + this.physicalId = physicalId; + this.messageId = messageId; + this.command = command; + this.data = data; + } + + // Getters + public int getPhysicalId() { return physicalId; } + public short getMessageId() { return messageId; } + public byte getCommand() { return command; } + public byte[] getData() { return data; } + + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.JSON_STYLE) + .append("physicalId", physicalId) + .append("messageId", messageId) + .append("command", command) + .append("data", data) + .toString(); + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java new file mode 100644 index 000000000..8a030e58f --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/DnyMessage.java @@ -0,0 +1,15 @@ +package com.jsowell.netty.domain; + +import io.netty.buffer.ByteBuf; + +public class DnyMessage { + private final ByteBuf content; + + public DnyMessage(ByteBuf content) { + this.content = content; + } + + public ByteBuf getContent() { + return content; + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java new file mode 100644 index 000000000..927b8f3e1 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/Message68.java @@ -0,0 +1,15 @@ +package com.jsowell.netty.domain; + +import io.netty.buffer.ByteBuf; + +public class Message68 { + private final ByteBuf content; + + public Message68(ByteBuf content) { + this.content = content; + } + + public ByteBuf getContent() { + return content; + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/domain/ProtocolDnyMessage.java b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ProtocolDnyMessage.java new file mode 100644 index 000000000..76ee060df --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/domain/ProtocolDnyMessage.java @@ -0,0 +1,49 @@ +package com.jsowell.netty.domain; + +public class ProtocolDnyMessage { + private final byte[] header; + private final int length; + private final byte[] physicalId; + private final int messageId; + private final byte command; + private final byte[] data; + private final int checksum; + + public ProtocolDnyMessage(byte[] header, int length, byte[] physicalId, int messageId, byte command, byte[] data, int checksum) { + this.header = header; + this.length = length; + this.physicalId = physicalId; + this.messageId = messageId; + this.command = command; + this.data = data; + this.checksum = checksum; + } + + public byte[] getHeader() { + return header; + } + + public int getLength() { + return length; + } + + public byte[] getPhysicalId() { + return physicalId; + } + + public int getMessageId() { + return messageId; + } + + public byte getCommand() { + return command; + } + + public byte[] getData() { + return data; + } + + public int getChecksum() { + return checksum; + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/electricbicycles/AbstractHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/electricbicycles/AbstractHandler.java index 732915f63..3f98d5e8b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/electricbicycles/AbstractHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/electricbicycles/AbstractHandler.java @@ -53,7 +53,7 @@ public abstract class AbstractHandler implements InitializingBean { // 请求帧类型 byte[] requestFrameType = ykcDataProtocol.getFrameType(); // 应答帧类型 - byte[] responseFrameType = YKCFrameTypeCode.ResponseRelation.getResponseFrameTypeBytes(requestFrameType); + byte[] responseFrameType = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(requestFrameType); // 数据域 值为“序列号域+加密标志+帧类型标志+消息体”字节数之和 byte[] dataFields = Bytes.concat(serialNumber, encryptFlag, responseFrameType, messageBody); diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/AbstractHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/AbstractHandler.java index dad0f9121..98d2ba71d 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/AbstractHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/AbstractHandler.java @@ -46,19 +46,25 @@ public abstract class AbstractHandler implements InitializingBean { protected byte[] getResult(YKCDataProtocol ykcDataProtocol, byte[] messageBody) { // 起始标志 byte[] head = ykcDataProtocol.getHead(); + // 序列号域 byte[] serialNumber = ykcDataProtocol.getSerialNumber(); + // 加密标志 byte[] encryptFlag = ykcDataProtocol.getEncryptFlag(); + // 请求帧类型 byte[] requestFrameType = ykcDataProtocol.getFrameType(); + // 应答帧类型 - byte[] responseFrameType = YKCFrameTypeCode.ResponseRelation.getResponseFrameTypeBytes(requestFrameType); + byte[] responseFrameType = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(requestFrameType); // 数据域 值为“序列号域+加密标志+帧类型标志+消息体”字节数之和 byte[] dataFields = Bytes.concat(serialNumber, encryptFlag, responseFrameType, messageBody); + // 计算crc: 从序列号域到数据域的 CRC 校验 int crc16 = CRC16Util.calcCrc16(dataFields); + return Bytes.concat(head, BytesUtil.intToBytes(dataFields.length, 1), dataFields, BytesUtil.intToBytes(crc16)); } diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteControlGroundLockHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteControlGroundLockHandler.java index 96bce9448..73a99c555 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteControlGroundLockHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteControlGroundLockHandler.java @@ -19,7 +19,7 @@ import org.springframework.stereotype.Component; @Slf4j @Component public class RemoteControlGroundLockHandler extends AbstractHandler{ - private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_LIFTING_CODE.getBytes()); + private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_CODE.getBytes()); @Override public void afterPropertiesSet() throws Exception { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteControlGroundLockResponseHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteControlGroundLockResponseHandler.java index d0ac9e25c..fb84d37af 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteControlGroundLockResponseHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteControlGroundLockResponseHandler.java @@ -26,7 +26,7 @@ public class RemoteControlGroundLockResponseHandler extends AbstractHandler{ @Autowired private OrderPileOccupyService orderPileOccupyService; - private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.CHARGING_PILE_RESPOND_GROUND_LOCK_LIFTING_CODE.getBytes()); + private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_ANSWER_CODE.getBytes()); @Override public void afterPropertiesSet() throws Exception { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java index 0b0ffdf8d..518323d67 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStartChargingRequestHandler.java @@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture; @Slf4j @Component public class RemoteStartChargingRequestHandler extends AbstractHandler{ - private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_START_CHARGING_ANSWER_CODE.getBytes()); + private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_ANSWER_CODE.getBytes()); @Autowired private OrderBasicInfoService orderBasicInfoService; @@ -98,7 +98,7 @@ public class RemoteStartChargingRequestHandler extends AbstractHandler{ orderBasicInfoService.chargingPileStartedSuccessfully(transactionCode); } // orderBasicInfoService.updateOrderBasicInfo(orderInfo); - log.info("远程启动充电命令回复-交易流水号:{}, 桩编码:{}, 枪号:{}, 启动结果:{}, 失败原因:{}", transactionCode, pileSn, connectorCode, startResult, failedReasonMsg); + log.info("远程启动充电命令回复-交易流水号:{}, 桩编码:{}, 枪号:{}, 启动结果(00-失败, 01-成功):{}, 失败原因:{}", transactionCode, pileSn, connectorCode, startResult, failedReasonMsg); // 异步推送第三方平台 CompletableFuture.runAsync(() -> { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStopChargingRequestHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStopChargingRequestHandler.java index fdb8aebca..f17e573dd 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStopChargingRequestHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/RemoteStopChargingRequestHandler.java @@ -26,7 +26,7 @@ import java.util.Date; @Slf4j @Component public class RemoteStopChargingRequestHandler extends AbstractHandler{ - private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_STOP_CHARGING_ANSWER_CODE.getBytes()); + private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_ANSWER_CODE.getBytes()); @Autowired private OrderBasicInfoService orderBasicInfoService; diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingHandler.java index ea06e0a96..1967cd0a9 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingHandler.java @@ -14,7 +14,7 @@ import org.springframework.stereotype.Component; @Component public class ReservationChargingHandler extends AbstractHandler{ - private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_CODE.getBytes()); + private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getBytes()); @Override public void afterPropertiesSet() throws Exception { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java index 97ef357c2..4af5d49e0 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingResponseHandler.java @@ -16,7 +16,7 @@ import org.springframework.stereotype.Component; @Component public class ReservationChargingResponseHandler extends AbstractHandler{ - private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_ANSWER_CODE.getBytes()); + private final String type = YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_ANSWER_CODE.getBytes()); @Override public void afterPropertiesSet() throws Exception { diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingStartupResultHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingStartupResultHandler.java index ba3cd3944..041b1b9c4 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingStartupResultHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/handler/yunkuaichong/ReservationChargingStartupResultHandler.java @@ -16,7 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** - * 预约充电启动结果上送 + * 0x65预约充电启动结果上送 */ @Slf4j @Component @@ -66,7 +66,7 @@ public class ReservationChargingStartupResultHandler extends AbstractHandler{ byte[] vinCodeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length); String vinCode = BytesUtil.bcd2Str(vinCodeByteArr); - // 启动结果 + // 启动结果 0x00失败 0x01成功 startIndex += length; length = 1; byte[] startupResultByteArr = BytesUtil.copyBytes(msgBody, startIndex, length); diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java new file mode 100644 index 000000000..6b9c8eeda --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/NettyServerManager.java @@ -0,0 +1,152 @@ +package com.jsowell.netty.server; + +import com.jsowell.common.constant.Constants; +import com.jsowell.netty.server.electricbicycles.ElectricBicyclesServerChannelInitializer; +import com.jsowell.netty.server.mqtt.BootNettyMqttChannelInboundHandler; +import com.jsowell.netty.server.yunkuaichong.NettyServerChannelInitializer; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.net.InetSocketAddress; + +@Slf4j +@Component +public class NettyServerManager implements CommandLineRunner { + + @Resource + private NettyServerChannelInitializer nettyServerChannelInitializer; + + @Resource + private ElectricBicyclesServerChannelInitializer electricBicyclesServerChannelInitializer; + + @Override + public void run(String... args) throws Exception { + startNettyServer(Constants.SOCKET_IP, 9011); + startElectricBikeNettyServer(Constants.SOCKET_IP, 9012); + // startMqttSever(Constants.SOCKET_IP, 1883); + } + + public void startNettyServer(String host, int port) { + new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childHandler(nettyServerChannelInitializer) + .localAddress(new InetSocketAddress(host, port)); + + ChannelFuture future = bootstrap.bind(port).sync(); + if (future.isSuccess()) { + log.info("NettyServer启动成功, 开始监听端口:{}", port); + } else { + log.error("NettyServer启动失败", future.cause()); + } + + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("NettyServer.start error", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + }).start(); + } + + public void startElectricBikeNettyServer(String host, int port) { + new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childHandler(electricBicyclesServerChannelInitializer) + .localAddress(new InetSocketAddress(host, port)); + + ChannelFuture future = bootstrap.bind(port).sync(); + if (future.isSuccess()) { + log.info("ElectricBikeNettyServer启动成功, 开始监听端口:{}", port); + } else { + log.error("ElectricBikeNettyServer启动失败", future.cause()); + } + + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("ElectricBikeNettyServer.start error", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + }).start(); + } + + public void startMqttSever(String host, int port) { + new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap mqttBootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .option(ChannelOption.SO_BACKLOG, 128) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .localAddress(new InetSocketAddress(host, port)); + + mqttBootstrap.childHandler(new ChannelInitializer() { + protected void initChannel(SocketChannel ch) { + ChannelPipeline channelPipeline = ch.pipeline(); + // 设置读写空闲超时时间 + channelPipeline.addLast(new IdleStateHandler(600, 600, 1200)); + channelPipeline.addLast("encoder", MqttEncoder.INSTANCE); + channelPipeline.addLast("decoder", new MqttDecoder()); + channelPipeline.addLast(new BootNettyMqttChannelInboundHandler()); + } + }); + + ChannelFuture future = mqttBootstrap.bind(port).sync(); + if (future.isSuccess()) { + log.info("MqttServer启动成功, 开始监听端口:{}", port); + } else { + log.error("MqttServer启动失败", future.cause()); + } + + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("MqttServer.start error", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + }).start(); + } +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java new file mode 100644 index 000000000..7339de6fa --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ChargingPileHandler.java @@ -0,0 +1,185 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.jsowell.netty.domain.ChargingPileMessage; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.time.Instant; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.ConcurrentHashMap; + +@ChannelHandler.Sharable +@Slf4j +@Component +public class ChargingPileHandler extends ChannelInboundHandlerAdapter { + + /** + * 管理一个全局map,保存连接进服务端的通道数量 + */ + private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + + /** + * 有客户端连接服务器会触发此函数 + * 连接被建立并且准备进行通信时被调用 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + int clientPort = insocket.getPort(); + //获取连接通道唯一标识 + ChannelId channelId = ctx.channel().id(); + //如果map中不包含此连接,就保存连接 + if (CHANNEL_MAP.containsKey(channelId)) { + log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size()); + } else { + //保存连接 + CHANNEL_MAP.put(channelId, ctx); + log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size()); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + log.info("加载客户端报文=== channelId:{}, mag:{}", ctx.channel().id(), msg.toString()); + // if (!(msg instanceof ChargingPileMessage)) { + // return; + // } + + ChargingPileMessage message = (ChargingPileMessage) msg; + byte command = message.getCommand(); + + switch (command) { + case 0x11: + handleHeartbeat(ctx, message); + break; + case 0x12: + handleTimeRequest(ctx, message); + break; + case 0x15: + handleFirmwareUpgradeRequest(ctx, message); + break; + case (byte) 0xFA: + handleFirmwareUpgradeResponse(ctx, message); + break; + case 0x31: + handleReboot(ctx, message); + break; + case 0x32: + handleCommunicationModuleReboot(ctx, message); + break; + case 0x33: + handleClearUpgradeData(ctx, message); + break; + case 0x34: + handleChangeIPAddress(ctx, message); + break; + case 0x35: + handleSubdeviceVersionUpload(ctx, message); + break; + case 0x3B: + handleFSKParameterRequest(ctx, message); + break; + default: + log.info("Unknown command: " + String.format("0x%02X", command)); + } + } + + private void handleHeartbeat(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理心跳包,无需回复 + log.info("Received heartbeat from device ID: " + message.getPhysicalId()); + // 可以在这里更新设备状态、记录日志等 + } + + private void handleTimeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理时间请求 + long currentTime = Instant.now().getEpochSecond(); + byte[] timeBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt((int)currentTime).array(); + + ChargingPileMessage response = new ChargingPileMessage( + message.getPhysicalId(), + message.getMessageId(), + (byte) 0x12, + timeBytes + ); + + ctx.writeAndFlush(response); + } + + private void handleFirmwareUpgradeRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理固件升级请求 + // 这里需要实现固件升级的逻辑,包括发送固件数据包等 + log.info("Firmware upgrade requested from device ID: " + message.getPhysicalId()); + // TODO: 实现固件升级逻辑 + } + + private void handleFirmwareUpgradeResponse(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理固件升级响应 + log.info("Firmware upgrade response from device ID: " + message.getPhysicalId()); + // TODO: 根据响应继续发送下一个固件包或结束升级过程 + } + + private void handleReboot(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理重启主机指令 + log.info("Reboot command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x31, (byte) 0x00); + } + + private void handleCommunicationModuleReboot(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理重启通信模块指令 + log.info("Communication module reboot command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x32, (byte) 0x00); + } + + private void handleClearUpgradeData(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理清空升级分机数据指令 + log.info("Clear upgrade data command received for device ID: " + message.getPhysicalId()); + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x33, (byte) 0x00); + } + + private void handleChangeIPAddress(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理更改IP地址指令 + log.info("Change IP address command received for device ID: " + message.getPhysicalId()); + // TODO: 实现IP地址更改逻辑 + // 发送成功响应 + sendSimpleResponse(ctx, message, (byte) 0x34, (byte) 0x00); + } + + private void handleSubdeviceVersionUpload(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理上传分机版本号与设备类型 + log.info("Subdevice version upload received from device ID: " + message.getPhysicalId()); + // TODO: 处理上传的分机版本信息 + // 此命令不需要响应 + } + + private void handleFSKParameterRequest(ChannelHandlerContext ctx, ChargingPileMessage message) { + // 处理请求服务器FSK主机参数 + log.info("FSK parameter request received from device ID: " + message.getPhysicalId()); + // TODO: 实现发送FSK参数的逻辑(使用0x3A指令) + } + + private void sendSimpleResponse(ChannelHandlerContext ctx, ChargingPileMessage originalMessage, byte command, byte result) { + ChargingPileMessage response = new ChargingPileMessage( + originalMessage.getPhysicalId(), + originalMessage.getMessageId(), + command, + new byte[]{result} + ); + ctx.writeAndFlush(response); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java new file mode 100644 index 000000000..fd7b91cf7 --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerChannelInitializer.java @@ -0,0 +1,37 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.jsowell.netty.decoder.ChargingPileDecoder; +import com.jsowell.netty.decoder.ProtocolDnyDecoder; +import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder; +import com.jsowell.netty.decoder.StartAndLengthFieldFrameDecoder2; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.bytes.ByteArrayDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; + +@Component +public class ElectricBicyclesServerChannelInitializer extends ChannelInitializer { + + @Resource + ElectricBicyclesServerHandler electricBicyclesServerHandler; + + @Resource + ChargingPileHandler chargingPileHandler; + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("frameDecoder", new ChargingPileDecoder()); + pipeline.addLast("decoder", new ByteArrayDecoder()); + pipeline.addLast("encoder", new ByteArrayDecoder()); + //读超时时间设置为10s,0表示不监控 + pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); + pipeline.addLast("handler", chargingPileHandler); + } + +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java new file mode 100644 index 000000000..159861ffa --- /dev/null +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/electricbicycles/ElectricBicyclesServerHandler.java @@ -0,0 +1,234 @@ +package com.jsowell.netty.server.electricbicycles; + +import com.google.common.collect.Lists; +import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode; +import com.jsowell.common.enums.ykc.PileChannelEntity; +import com.jsowell.common.util.BytesUtil; +import com.jsowell.common.util.StringUtils; +import com.jsowell.common.util.YKCUtils; +import com.jsowell.netty.service.yunkuaichong.YKCBusinessService; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.ReadTimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * netty服务端处理类 + */ +@ChannelHandler.Sharable +@Slf4j +@Component +public class ElectricBicyclesServerHandler extends ChannelInboundHandlerAdapter { + + @Autowired + private YKCBusinessService ykcService; + + /** + * 管理一个全局map,保存连接进服务端的通道数量 + */ + private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + + private final List notPrintFrameTypeList = Lists.newArrayList("0x03"); + + /** + * 有客户端连接服务器会触发此函数 + * 连接被建立并且准备进行通信时被调用 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + int clientPort = insocket.getPort(); + //获取连接通道唯一标识 + ChannelId channelId = ctx.channel().id(); + //如果map中不包含此连接,就保存连接 + if (CHANNEL_MAP.containsKey(channelId)) { + log.info("Handler:{}, 客户端【{}】是连接状态,连接通道数量: {}", this.getClass().getSimpleName(), channelId, CHANNEL_MAP.size()); + } else { + //保存连接 + CHANNEL_MAP.put(channelId, ctx); + log.info("Handler:{}, 客户端【{}】, 连接netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", this.getClass().getSimpleName(), channelId, clientIp, clientPort, CHANNEL_MAP.size()); + } + } + + /** + * 有客户端发消息会触发此函数 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { + // log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg); + // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 + byte[] msg = (byte[]) message; + + // 获取帧类型 + byte[] frameTypeBytes = BytesUtil.copyBytes(msg, 5, 1); + String frameType = YKCUtils.frameType2Str(frameTypeBytes); + // 获取序列号域 + int serialNumber = BytesUtil.bytesToIntLittle(BytesUtil.copyBytes(msg, 2, 2)); + // 获取channel + Channel channel = ctx.channel(); + + // new + // String hexString = DatatypeConverter.printHexBinary(msg); + + // 心跳包0x03日志太多,造成日志文件过大,改为不打印 + if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}, new报文:{}", + // channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + // BytesUtil.binary(msg, 16), hexString); + log.info("【<<<<<平台收到消息<<<<<】channel:{}, 帧类型:{}, 帧名称:{}, 序列号域:{}, 报文:{}", + channel.id(), frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + BytesUtil.binary(msg, 16)); + } + + // 处理数据 + byte[] response = ykcService.process(msg, channel); + if (Objects.nonNull(response)) { + // 响应客户端 + ByteBuf buffer = ctx.alloc().buffer().writeBytes(response); + this.channelWrite(channel.id(), buffer); + if (!CollectionUtils.containsAny(notPrintFrameTypeList, frameType)) { + // 应答帧类型 + byte[] responseFrameTypeBytes = YKCFrameTypeCode.PlatformAnswersRelation.getResponseFrameTypeBytes(frameTypeBytes); + String responseFrameType = YKCUtils.frameType2Str(responseFrameTypeBytes); + log.info("【>>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", + channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), + frameType, YKCFrameTypeCode.getFrameTypeStr(frameType), serialNumber, + BytesUtil.binary(response, 16)); + } + } + } + + /** + * 有客户端终止连接服务器会触发此函数 + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = insocket.getAddress().getHostAddress(); + ChannelId channelId = ctx.channel().id(); + //包含此客户端才去删除 + if (CHANNEL_MAP.containsKey(channelId)) { + ykcService.exit(channelId); + //删除连接 + CHANNEL_MAP.remove(channelId); + log.info("客户端【{}】, 退出netty服务器【IP:{}, PORT:{}】, 连接通道数量: {}", channelId, clientIp, insocket.getPort(), CHANNEL_MAP.size()); + } + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) + // Channel incoming = ctx.channel(); + // log.info("handlerAdded: handler被添加到channel的pipeline connect:" + incoming.remoteAddress()); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) + // Channel incoming = ctx.channel(); + // log.info("handlerRemoved: handler从channel的pipeline中移除 connect:" + incoming.remoteAddress()); + // ChannelMapByEntity.removeChannel(incoming); + // ChannelMap.removeChannel(incoming); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + // log.info("channel:【{}】读数据完成", channel.id()); + super.channelReadComplete(ctx); + } + + /** + * 服务端给客户端发送消息 + * + * @param channelId 连接通道唯一id + * @param msg 需要发送的消息内容 + */ + public void channelWrite(ChannelId channelId, Object msg) throws Exception { + ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); + if (ctx == null) { + log.info("通道【{}】不存在", channelId); + return; + } + if (msg == null || msg == "") { + log.info("服务端响应空的消息"); + return; + } + //将客户端的信息直接返回写入ctx + ctx.write(msg); + //刷新缓存区 + ctx.flush(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + String socketString = ctx.channel().remoteAddress().toString(); + ChannelId channelId = ctx.channel().id(); + String pileSn = PileChannelEntity.getPileSnByChannelId(channelId.asLongText()); + if (evt instanceof IdleStateEvent) { // 超时事件 + IdleStateEvent event = (IdleStateEvent) evt; + boolean flag = false; + if (event.state() == IdleState.READER_IDLE) { // 读 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, READER_IDLE 读超时", socketString, channelId, pileSn); + } else if (event.state() == IdleState.WRITER_IDLE) { // 写 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, WRITER_IDLE 写超时", socketString, channelId, pileSn); + } else if (event.state() == IdleState.ALL_IDLE) { // 全部 + flag = true; + // log.error("Client-IP:【{}】, channelId:【{}】, pileSn:【{}】, ALL_IDLE 总超时", socketString, channelId, pileSn); + } + if (flag) { + ctx.channel().close(); + // close(channelId, pileSn); + } + } + } + + /** + * 发生异常会触发此函数 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ChannelId channelId = ctx.channel().id(); + String channelIdShortText = channelId.asShortText(); + String pileSn = PileChannelEntity.getPileSnByChannelId(channelIdShortText); + log.error("发生异常 channelId:{}, pileSn:{}", channelIdShortText, pileSn, cause); + cause.printStackTrace(); + // 如果桩连到平台,在1分钟内没有发送数据过来,会报ReadTimeoutException异常 + if (cause instanceof ReadTimeoutException) { + if (log.isTraceEnabled()) { + log.trace("Connection timeout 【{}】", ctx.channel().remoteAddress()); + } + log.error("【{}】发生了错误, pileSn:【{}】此连接被关闭, 此时连通数量: {}", channelId, pileSn, CHANNEL_MAP.size()); + ctx.channel().close(); + } + // close(channelId, pileSn); + } + + + // 公共方法 关闭连接 + private void closeConnection(String pileSn, ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + ChannelId channelId = channel.id(); + log.error("close方法-发生异常,关闭链接,channelId:{}, pileSn:{}", channelId.asShortText(), pileSn); + if (channel != null && !channel.isActive() && !channel.isOpen() && !channel.isWritable()) { + channel.close(); + // 删除连接 + CHANNEL_MAP.remove(channelId); + } + // 删除桩编号和channel的关系 + if (StringUtils.isNotBlank(pileSn)) { + PileChannelEntity.removeByPileSn(pileSn); + } + } +} \ No newline at end of file diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java index 09649c12d..59e8395e0 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/mqtt/MqttSever.java @@ -19,8 +19,9 @@ import org.springframework.stereotype.Component; import java.net.InetSocketAddress; @Slf4j -@Component -@Order(5) +@Deprecated +// @Component +// @Order(7) public class MqttSever implements CommandLineRunner { @Override diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java index 4ff2a2e5e..d1eb8d88b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServer.java @@ -16,8 +16,9 @@ import javax.annotation.Resource; import java.net.InetSocketAddress; @Slf4j -@Component -@Order(2) +@Deprecated +// @Component +// @Order(3) public class NettyServer implements CommandLineRunner { @Resource private NettyServerChannelInitializer nettyServerChannelInitializer; diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java index 8551a6229..b343fde1b 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerChannelInitializer.java @@ -21,7 +21,7 @@ public class NettyServerChannelInitializer extends ChannelInitializer>>>>平台响应消息>>>>>】channel:{}, 响应帧类型:{}, 响应帧名称:{}, 原帧类型:{}, 原帧名称:{}, 序列号域:{}, response:{}", channel.id(), responseFrameType, YKCFrameTypeCode.getFrameTypeStr(responseFrameType), @@ -109,6 +109,11 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } } + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("channelRead0=== channelId:" + ctx.channel().id() + ", msg:" + msg); + } + /** * 有客户端终止连接服务器会触发此函数 */ diff --git a/jsowell-pile/src/main/java/com/jsowell/adapay/config/InitializeAdapayConfig.java b/jsowell-pile/src/main/java/com/jsowell/adapay/config/InitializeAdapayConfig.java index 5e206c758..1c9afba02 100644 --- a/jsowell-pile/src/main/java/com/jsowell/adapay/config/InitializeAdapayConfig.java +++ b/jsowell-pile/src/main/java/com/jsowell/adapay/config/InitializeAdapayConfig.java @@ -16,7 +16,7 @@ import java.util.Map; @Slf4j @Component -@Order(1) +@Order(2) public class InitializeAdapayConfig implements CommandLineRunner { @Value("${adapay.debugFlag}") @@ -36,7 +36,6 @@ public class InitializeAdapayConfig implements CommandLineRunner { @Override public void run(String... args) throws Exception { - log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 InitializeAdapayConfig order 1 <<<<<<<<<<<<<"); /* 单商户 */ @@ -47,6 +46,7 @@ public class InitializeAdapayConfig implements CommandLineRunner { 目前有jsowell和xixiao */ multiMerchant(); + log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 InitializeAdapayConfig order 2 <<<<<<<<<<<<<"); } /** diff --git a/jsowell-pile/src/main/java/com/jsowell/alipay/config/AliPayConfig.java b/jsowell-pile/src/main/java/com/jsowell/alipay/config/AliPayConfig.java index 968c0a227..b9c1b22d2 100644 --- a/jsowell-pile/src/main/java/com/jsowell/alipay/config/AliPayConfig.java +++ b/jsowell-pile/src/main/java/com/jsowell/alipay/config/AliPayConfig.java @@ -11,7 +11,7 @@ import org.springframework.stereotype.Component; @Slf4j @Component -@Order(2) +@Order(4) public class AliPayConfig implements CommandLineRunner { @Value("${alipay.gatewayHost}") private String gatewayHost; @@ -33,7 +33,6 @@ public class AliPayConfig implements CommandLineRunner { @Override public void run(String... args) throws Exception { - log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 AliPayConfig order 2 <<<<<<<<<<<<<"); // 设置参数(全局只需设置一次) Config config = new Config(); config.protocol = Constants.HTTPS; @@ -57,5 +56,6 @@ public class AliPayConfig implements CommandLineRunner { // 可设置AES密钥,调用AES加解密相关接口时需要(可选) config.encryptKey = encryptKey; Factory.setOptions(config); + log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 AliPayConfig order 4 <<<<<<<<<<<<<"); } } \ No newline at end of file diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/domain/OrderBasicInfo.java b/jsowell-pile/src/main/java/com/jsowell/pile/domain/OrderBasicInfo.java index bf940bcaf..6e50736b0 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/domain/OrderBasicInfo.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/domain/OrderBasicInfo.java @@ -76,7 +76,7 @@ public class OrderBasicInfo { private String vinCode; /** - * 启动方式(0-后管启动;1-用户app启动;2-卡启动;3-离线卡启动; 4-联联平台启动; 5-车辆vin码启动) + * 启动方式(0-后管启动;1-用户app启动;2-卡启动;3-离线卡启动; 4-联联平台启动; 5-车辆vin码启动; 6-个人桩预约启动) */ private String startMode; diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/dto/GenerateOrderDTO.java b/jsowell-pile/src/main/java/com/jsowell/pile/dto/GenerateOrderDTO.java index e41c0f646..263c84c22 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/dto/GenerateOrderDTO.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/dto/GenerateOrderDTO.java @@ -17,6 +17,14 @@ import java.math.BigDecimal; @Data public class GenerateOrderDTO extends BasicPileDTO{ private static final long serialVersionUID = -1374766807594759104L; + + /** + * 交易流水号 + * 无transactionCode, 表示由平台端启动充电, 平台自行生成交易流水号 + * 有transactionCode, 表示桩端主动启动充电, 平台使用桩端生成的交易流水号 + */ + private String transactionCode; + /** * 会员id */ diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/dto/ReservationChargingStartupResult.java b/jsowell-pile/src/main/java/com/jsowell/pile/dto/ReservationChargingStartupResult.java index 089788305..e40695c09 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/dto/ReservationChargingStartupResult.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/dto/ReservationChargingStartupResult.java @@ -5,6 +5,9 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +/** + * 0x65预约充电启动结果封装对象 + */ @Data @NoArgsConstructor @AllArgsConstructor @@ -31,7 +34,7 @@ public class ReservationChargingStartupResult { private String vinCode; /** - * 启动结果 + * 启动结果 0x00失败 0x01成功 */ private String startupResult; diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/OrderBasicInfoService.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/OrderBasicInfoService.java index b41f3aa4b..1af0a6555 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/OrderBasicInfoService.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/OrderBasicInfoService.java @@ -471,4 +471,10 @@ public interface OrderBasicInfoService{ * @param endTime */ List getOrderDetailByStationIds(List stationIds, String startTime, String endTime); + + /** + * 创建预约启动充电订单 + * @param chargingStartupResult + */ + void createReservationOrder(ReservationChargingStartupResult chargingStartupResult); } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/OrderBasicInfoServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/OrderBasicInfoServiceImpl.java index e272679d0..13d3c2a1e 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/OrderBasicInfoServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/OrderBasicInfoServiceImpl.java @@ -3530,7 +3530,10 @@ public class OrderBasicInfoServiceImpl implements OrderBasicInfoService { @Override public OrderBasicInfo saveOrder2Database(GenerateOrderDTO dto) throws ParseException { String orderCode = generateNewOrderCode(); - String transactionCode = IdUtils.generateTransactionCode(dto.getPileSn(), dto.getConnectorCode()); + String transactionCode = dto.getTransactionCode(); + if (StringUtils.isBlank(transactionCode)) { + transactionCode = IdUtils.generateTransactionCode(dto.getPileSn(), dto.getConnectorCode()); + } if (StringUtils.isBlank(dto.getStartType())) { dto.setStartType(StartTypeEnum.NOW.getValue()); @@ -3593,31 +3596,35 @@ public class OrderBasicInfoServiceImpl implements OrderBasicInfoService { // 订单详情 BillingTemplateVO billingTemplate = dto.getBillingTemplate(); logger.info("订单使用的计费模板-orderCode:{}, billingTemplate:{}", orderCode, JSON.toJSONString(billingTemplate)); - BigDecimal sharpElectricityPrice = billingTemplate.getSharpElectricityPrice() != null ? billingTemplate.getSharpElectricityPrice() : BigDecimal.ZERO; - BigDecimal sharpServicePrice = billingTemplate.getSharpServicePrice() != null ? billingTemplate.getSharpServicePrice() : BigDecimal.ZERO; - BigDecimal peakElectricityPrice = billingTemplate.getPeakElectricityPrice() != null ? billingTemplate.getPeakElectricityPrice() : BigDecimal.ZERO; - BigDecimal peakServicePrice = billingTemplate.getPeakServicePrice() != null ? billingTemplate.getPeakServicePrice() : BigDecimal.ZERO; - BigDecimal flatElectricityPrice = billingTemplate.getFlatElectricityPrice() != null ? billingTemplate.getFlatElectricityPrice() : BigDecimal.ZERO; - BigDecimal flatServicePrice = billingTemplate.getFlatServicePrice() != null ? billingTemplate.getFlatServicePrice() : BigDecimal.ZERO; - BigDecimal valleyElectricityPrice = billingTemplate.getValleyElectricityPrice() != null ? billingTemplate.getValleyElectricityPrice() : BigDecimal.ZERO; - BigDecimal valleyServicePrice = billingTemplate.getValleyServicePrice() != null ? billingTemplate.getValleyServicePrice() : BigDecimal.ZERO; + OrderDetail orderDetail = null; + if (billingTemplate != null) { + BigDecimal sharpElectricityPrice = billingTemplate.getSharpElectricityPrice() != null ? billingTemplate.getSharpElectricityPrice() : BigDecimal.ZERO; + BigDecimal sharpServicePrice = billingTemplate.getSharpServicePrice() != null ? billingTemplate.getSharpServicePrice() : BigDecimal.ZERO; + BigDecimal peakElectricityPrice = billingTemplate.getPeakElectricityPrice() != null ? billingTemplate.getPeakElectricityPrice() : BigDecimal.ZERO; + BigDecimal peakServicePrice = billingTemplate.getPeakServicePrice() != null ? billingTemplate.getPeakServicePrice() : BigDecimal.ZERO; + BigDecimal flatElectricityPrice = billingTemplate.getFlatElectricityPrice() != null ? billingTemplate.getFlatElectricityPrice() : BigDecimal.ZERO; + BigDecimal flatServicePrice = billingTemplate.getFlatServicePrice() != null ? billingTemplate.getFlatServicePrice() : BigDecimal.ZERO; + BigDecimal valleyElectricityPrice = billingTemplate.getValleyElectricityPrice() != null ? billingTemplate.getValleyElectricityPrice() : BigDecimal.ZERO; + BigDecimal valleyServicePrice = billingTemplate.getValleyServicePrice() != null ? billingTemplate.getValleyServicePrice() : BigDecimal.ZERO; - OrderDetail orderDetail = OrderDetail.builder() - .orderCode(orderCode) - .sharpPrice(sharpElectricityPrice.add(sharpServicePrice)) - .sharpElectricityPrice(sharpElectricityPrice) - .sharpServicePrice(sharpServicePrice) - .peakPrice(peakElectricityPrice.add(peakServicePrice)) - .peakElectricityPrice(peakElectricityPrice) - .peakServicePrice(peakServicePrice) - .flatPrice(flatElectricityPrice.add(flatServicePrice)) - .flatElectricityPrice(flatElectricityPrice) - .flatServicePrice(flatServicePrice) - .valleyPrice(valleyElectricityPrice.add(valleyServicePrice)) - .valleyElectricityPrice(valleyElectricityPrice) - .valleyServicePrice(valleyServicePrice) - .build(); + orderDetail = OrderDetail.builder() + .orderCode(orderCode) + .sharpPrice(sharpElectricityPrice.add(sharpServicePrice)) + .sharpElectricityPrice(sharpElectricityPrice) + .sharpServicePrice(sharpServicePrice) + .peakPrice(peakElectricityPrice.add(peakServicePrice)) + .peakElectricityPrice(peakElectricityPrice) + .peakServicePrice(peakServicePrice) + .flatPrice(flatElectricityPrice.add(flatServicePrice)) + .flatElectricityPrice(flatElectricityPrice) + .flatServicePrice(flatServicePrice) + .valleyPrice(valleyElectricityPrice.add(valleyServicePrice)) + .valleyElectricityPrice(valleyElectricityPrice) + .valleyServicePrice(valleyServicePrice) + .build(); + } + // 保存到数据库 OrderTransactionDTO createOrderTransactionDTO = OrderTransactionDTO.builder() .orderBasicInfo(orderBasicInfo) .orderDetail(orderDetail) @@ -3745,5 +3752,44 @@ public class OrderBasicInfoServiceImpl implements OrderBasicInfoService { return orderBasicInfoMapper.getOrderDetailByStationIds(stationIds, startTime, endTime); } + /** + * 创建预约订单/预约充电订单/ + * @param chargingStartupResult + */ + @Override + public void createReservationOrder(ReservationChargingStartupResult chargingStartupResult) { + String orderCode = generateNewOrderCode(); + String transactionCode = chargingStartupResult.getTransactionCode(); + + String status = StringUtils.equals(chargingStartupResult.getStartupResult(), "00") + ? OrderStatusEnum.IN_THE_CHARGING.getValue() + : OrderStatusEnum.ORDER_CLOSE_TIMEOUT.getValue(); + + // 订单基本信息 + OrderBasicInfo orderBasicInfo = OrderBasicInfo.builder() + .orderCode(orderCode) + .transactionCode(transactionCode) + .orderStatus(status) + .pileSn(chargingStartupResult.getPileSn()) + .connectorCode(chargingStartupResult.getConnectorCode()) + .pileConnectorCode(chargingStartupResult.getPileSn() + chargingStartupResult.getConnectorCode()) + .startMode("6") + .payStatus(Constants.TWO) + .payMode(Constants.THREE) + .orderAmount(BigDecimal.ZERO) + .virtualAmount(BigDecimal.ZERO) + .settleAmount(BigDecimal.ZERO) + .startType(StartTypeEnum.RESERVED.getValue()) + .reason(chargingStartupResult.getFailReason()) + .build(); + + // 保存到数据库 + OrderTransactionDTO createOrderTransactionDTO = OrderTransactionDTO.builder() + .orderBasicInfo(orderBasicInfo) + .orderDetail(null) + .build(); + pileTransactionService.doCreateOrder(createOrderTransactionDTO); + } + } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileBasicInfoServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileBasicInfoServiceImpl.java index 10a7f37fb..06013e9ce 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileBasicInfoServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileBasicInfoServiceImpl.java @@ -90,6 +90,9 @@ public class PileBasicInfoServiceImpl implements PileBasicInfoService { @Autowired private WxAppletRemoteService wxAppletRemoteService; + @Autowired + private OrderBasicInfoService orderBasicInfoService; + /** * 查询设备管理 * @@ -1218,7 +1221,10 @@ public class PileBasicInfoServiceImpl implements PileBasicInfoService { */ @Override public void startupResult(ReservationChargingStartupResult chargingStartupResult) { + // 创建订单 + orderBasicInfoService.createReservationOrder(chargingStartupResult); + // 小程序通知 - wxAppletRemoteService.reservationStartupResultSendMsg(); + wxAppletRemoteService.reservationStartupResultSendMsg(chargingStartupResult); } } diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileReservationInfoServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileReservationInfoServiceImpl.java index 1aad1138e..b7c15ded2 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileReservationInfoServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/PileReservationInfoServiceImpl.java @@ -184,7 +184,7 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic .reservedEndTime(pileReservationInfo.getEndTime().toLocalTime()) .amount(Constants.WHITELIST_DEFAULT_AMOUNT) .build(); - pileRemoteService.reservationCharging(command); + // pileRemoteService.reservationCharging(command); } } @@ -235,7 +235,7 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic .reservedEndTime(pileReservationInfo.getEndTime().toLocalTime()) .amount(Constants.WHITELIST_DEFAULT_AMOUNT) .build(); - pileRemoteService.reservationCharging(command); + // pileRemoteService.reservationCharging(command); } } @@ -413,12 +413,8 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic return; } - /** - * 操作 - * 0x01:启动 0x02:取消 0x03:修改 - */ + // 操作 0x01:启动 0x02:取消 0x03:修改 String operation = "03"; - if (StringUtils.isNotBlank(dto.getStartTime())) { pileReservationInfo.setStartTime(Time.valueOf(dto.getStartTime())); operation = "03"; @@ -427,6 +423,10 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic pileReservationInfo.setEndTime(Time.valueOf(dto.getEndTime())); operation = "03"; } + if (StringUtils.isNotBlank(dto.getVerifyIdentity())) { + pileReservationInfo.setVerifyIdentity(dto.getVerifyIdentity()); + operation = "03"; + } if (StringUtils.isNotBlank(dto.getStatus())) { pileReservationInfo.setStatus(dto.getStatus()); if (StringUtils.equals(dto.getStatus(), Constants.ZERO)) { @@ -437,12 +437,11 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic operation = "01"; } } - if (StringUtils.isNotBlank(dto.getVerifyIdentity())) { - pileReservationInfo.setVerifyIdentity(dto.getVerifyIdentity()); - } pileReservationInfo.setUpdateBy(dto.getMemberId()); - this.insertOrUpdateSelective(pileReservationInfo); + /* + 先发送指令, 收到回复更新数据库 + */ // 查询会员的绑定vin列表 2024年7月30日11点04分 以当前请求会员的VIN为准 List plateNumberVOList = memberPlateNumberRelationService.selectMemberPlateNumberRelation(dto.getMemberId()); List vinCodes = Lists.newArrayList(); @@ -457,10 +456,7 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic while (vinCodes.size() < 3) { vinCodes.add(""); } - String type = StringUtils.equals(pileReservationInfo.getReservationType(), "single") ? "00" : "01"; - - // 发送指令 ReservationChargingCommand command = ReservationChargingCommand.builder() .transactionCode(Constants.ILLEGAL_TRANSACTION_CODE) .pileSn(pileReservationInfo.getPileSn()) @@ -476,6 +472,11 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic .amount(Constants.WHITELIST_DEFAULT_AMOUNT) .build(); pileRemoteService.reservationCharging(command); + + // 从redis中获取回复, 3秒没有获取到判为超时 + + + this.insertOrUpdateSelective(pileReservationInfo); } /** diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java index 20867b799..cd5508d40 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/impl/YKCPushCommandServiceImpl.java @@ -56,9 +56,9 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { // 需要记录报文的数据帧类型 private final List frameTypeList = Lists.newArrayList( YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_RESTART_CODE.getBytes()), - YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_START_CODE.getBytes()), - YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_STOP_CHARGING_CODE.getBytes()), - YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_CODE.getBytes()) + YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE.getBytes()), + YKCUtils.frameType2Str(YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE.getBytes()), + YKCUtils.frameType2Str(YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE.getBytes()) ); /** @@ -77,8 +77,8 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn); return false; } - /** - * 拼接报文 + /* + 拼接报文 */ // 起始标志 byte[] head = new byte[]{0x68}; @@ -100,11 +100,12 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { // 帧校验域 byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp)); + // 返回报文 byte[] writeMsg = Bytes.concat(head, length, temp, crc); + // 返回完整的报文 string类型 String wholeMsg = BytesUtil.binary(writeMsg, 16); - // log.info("[" + channel.remoteAddress() + "] 主动发送push请求信息:{}", wholeMsg); ByteBuf byteBuf = channel.alloc().buffer().writeBytes(writeMsg); ChannelFuture channelFuture = channel.writeAndFlush(byteBuf); channelFuture.addListener((ChannelFutureListener) channelFutureListener -> { @@ -176,7 +177,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { byte[] accountBalanceByteArr = YKCUtils.getPriceByte(chargeAmount.toString(), 2); byte[] msgBody = Bytes.concat(orderIdByteArr, pileSnByteArr, connectorCodeByteArr, logicCardNumByteArr, physicsCardNumByteArr, accountBalanceByteArr); - this.push(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_START_CODE); + this.push(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE); log.info("【=====平台下发充电指令=====】:订单id:{}, 桩号:{}, 枪口号:{}, 逻辑卡号:{}, 物理卡号:{}, 账户余额:{}", transactionCode, pileSn, BytesUtil.bcd2Str(connectorCodeByteArr), logicCardNum, physicsCardNum, chargeAmount); } @@ -192,7 +193,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { String connectorCode = command.getConnectorCode(); // 远程停机 byte[] msgBody = Bytes.concat(BytesUtil.str2Bcd(pileSn), BytesUtil.str2Bcd(connectorCode)); - this.push(msgBody, pileSn, YKCFrameTypeCode.REMOTE_STOP_CHARGING_CODE); + this.push(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE); log.info("【=====平台下发指令=====】:远程停止充电,桩号:{},枪口号:{}", pileSn, connectorCode); } @@ -474,7 +475,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { // 拼装msg信息 byte[] msg = Bytes.concat(pileSnByteArr, connectorCodeByteArr, operateByteArr, obligateByteArr); - this.push(msg, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_LIFTING_CODE); + this.push(msg, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_CODE); } /** @@ -539,7 +540,7 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService { reservationTypeByteArr, verifyIdentityByteArr, vin1ByteArr, vin2ByteArr, vin3ByteArr, reservedStartTimeByteArr, reservedEndTimeByteArr, amountByteArr); - this.push(msg, pileSn, YKCFrameTypeCode.RESERVATION_CHARGING_CODE); + this.push(msg, pileSn, YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE); log.info("【=====平台下发指令=====】: 预约充电指令, 交易流水号:{}, 桩编号:{}, 枪口号:{}, 操作:{}, 身份验证:{}, 开始时间:{}, 结束时间:{}, 启动金额:{}", transactionCode, pileSn, connectorCode, operation, verifyIdentity, DateUtils.formatDateTime(reservedStartTime), DateUtils.formatDateTime(reservedEndTime), amount); diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/AbstractProgramLogic.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/AbstractProgramLogic.java index a9ffca059..6eddcf9b7 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/AbstractProgramLogic.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/AbstractProgramLogic.java @@ -242,6 +242,9 @@ public abstract class AbstractProgramLogic implements InitializingBean { BigDecimal orderAmount = orderBasicInfo.getOrderAmount(); // 更新订单详情 查询订单详情 修改订单数据 OrderDetail orderDetail = orderBasicInfoService.getOrderDetailByOrderCode(orderCode); + if (orderDetail == null) { + return null; + } try { // 总电费金额 BigDecimal totalElectricityAmount = BigDecimal.ZERO; @@ -612,6 +615,12 @@ public abstract class AbstractProgramLogic implements InitializingBean { String stationId = orderBasicInfo.getStationId(); // 站点id String merchantId = orderBasicInfo.getMerchantId(); // 运营商id + if (orderDetail == null) { + logger.info("计算订单折扣V2, OrderDetail为空(orderCode:{}, transactionCode:{}), 直接返回!", + orderBasicInfo.getOrderCode(), orderBasicInfo.getTransactionCode()); + return; + } + // 原始电费金额 BigDecimal originalTotalElectricityAmount = orderDetail.getTotalElectricityAmount(); // 原始服务费金额 diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/DelayMerchantProgramLogic.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/DelayMerchantProgramLogic.java index de933a2b3..afdb35846 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/DelayMerchantProgramLogic.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/DelayMerchantProgramLogic.java @@ -71,7 +71,10 @@ public class DelayMerchantProgramLogic extends AbstractProgramLogic { orderBasicInfoService.analysisPileParameter(dto); // 校验充电桩相关的信息 - orderBasicInfoService.checkPileInfo(dto); + if (StringUtils.isBlank(dto.getTransactionCode())) { + // 无transactionCode, 表示由平台端启动充电, 需要验证充电桩状态 + orderBasicInfoService.checkPileInfo(dto); + } // 保存订单到数据库 saveOrder2Database return orderBasicInfoService.saveOrder2Database(dto); @@ -430,7 +433,8 @@ public class DelayMerchantProgramLogic extends AbstractProgramLogic { // 从redis中取出实时记录保存到表中 realTimeMonitorDataRedis2DB(orderBasicInfo.getTransactionCode(), orderBasicInfo.getOrderCode()); - logger.info("结算订单end:{} OrderTransactionDTO:{}", orderBasicInfo.getOrderCode(), JSON.toJSONString(dto)); + logger.info("结算订单end! orderCode:{}, transactionCode:{}, OrderTransactionDTO:{}", + orderBasicInfo.getOrderCode(), orderBasicInfo.getTransactionCode(), JSON.toJSONString(dto)); } /** diff --git a/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/NotDelayMerchantProgramLogic.java b/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/NotDelayMerchantProgramLogic.java index 147db42fd..926858719 100644 --- a/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/NotDelayMerchantProgramLogic.java +++ b/jsowell-pile/src/main/java/com/jsowell/pile/service/programlogic/NotDelayMerchantProgramLogic.java @@ -72,7 +72,10 @@ public class NotDelayMerchantProgramLogic extends AbstractProgramLogic { orderBasicInfoService.analysisPileParameter(dto); // 校验充电桩相关的信息 - orderBasicInfoService.checkPileInfo(dto); + if (StringUtils.isBlank(dto.getTransactionCode())) { + // 无transactionCode, 表示由平台端启动充电, 需要验证充电桩状态 + orderBasicInfoService.checkPileInfo(dto); + } // 保存订单到数据库 saveOrder2Database return orderBasicInfoService.saveOrder2Database(dto); diff --git a/jsowell-pile/src/main/java/com/jsowell/wxpay/config/WechatPayConfig.java b/jsowell-pile/src/main/java/com/jsowell/wxpay/config/WechatPayConfig.java index 797075a5b..dd0d2391f 100644 --- a/jsowell-pile/src/main/java/com/jsowell/wxpay/config/WechatPayConfig.java +++ b/jsowell-pile/src/main/java/com/jsowell/wxpay/config/WechatPayConfig.java @@ -3,13 +3,15 @@ package com.jsowell.wxpay.config; import com.jsowell.wxpay.common.WeChatPayParameter; import com.jsowell.wxpay.utils.WechatPayUtils; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -@Order(3) +@Slf4j @Component +@Order(5) public class WechatPayConfig implements CommandLineRunner { /** * 公众号appid @@ -78,7 +80,6 @@ public class WechatPayConfig implements CommandLineRunner { @Override public void run(String... args) throws Exception { - System.out.println(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 WechatPayConfig order 3 <<<<<<<<<<<<<"); //微信支付 WeChatPayParameter.mchId = wechatMchId; WeChatPayParameter.appId = wechatAppId; @@ -94,5 +95,6 @@ public class WechatPayConfig implements CommandLineRunner { WeChatPayParameter.mchSerialNo = mchSerialNo; //获取平台证书 WeChatPayParameter.certificateMap = WechatPayUtils.refreshCertificate(); + log.info(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 WechatPayConfig order 5 <<<<<<<<<<<<<"); } } diff --git a/jsowell-pile/src/main/java/com/jsowell/wxpay/service/WxAppletRemoteService.java b/jsowell-pile/src/main/java/com/jsowell/wxpay/service/WxAppletRemoteService.java index ee2f9f9e6..c96ab08cb 100644 --- a/jsowell-pile/src/main/java/com/jsowell/wxpay/service/WxAppletRemoteService.java +++ b/jsowell-pile/src/main/java/com/jsowell/wxpay/service/WxAppletRemoteService.java @@ -14,6 +14,7 @@ import com.jsowell.common.util.DateUtils; import com.jsowell.common.util.StringUtils; import com.jsowell.common.util.http.HttpUtils; import com.jsowell.pile.domain.MemberBasicInfo; +import com.jsowell.pile.dto.ReservationChargingStartupResult; import com.jsowell.pile.service.MemberBasicInfoService; import com.jsowell.pile.service.PileBillingTemplateService; import com.jsowell.pile.service.OrderBasicInfoService; @@ -277,7 +278,7 @@ public class WxAppletRemoteService { /** * 预约充电结果小程序服务通知 */ - public Map reservationStartupResultSendMsg() { + public Map reservationStartupResultSendMsg(ReservationChargingStartupResult chargingStartupResult) { AppletTemplateMessageSendDTO msgInfo = new AppletTemplateMessageSendDTO(); return uniAppSendMsg(msgInfo); } diff --git a/jsowell-pile/src/main/resources/mapper/pile/ThirdPartyStationRelationMapper.xml b/jsowell-pile/src/main/resources/mapper/pile/ThirdPartyStationRelationMapper.xml index 59b17feb5..3743e8a93 100644 --- a/jsowell-pile/src/main/resources/mapper/pile/ThirdPartyStationRelationMapper.xml +++ b/jsowell-pile/src/main/resources/mapper/pile/ThirdPartyStationRelationMapper.xml @@ -203,7 +203,7 @@ t2.data_secret as dataSecret, t2.data_secret_IV as dataSecretIv from thirdparty_station_relation t1 - join thirdparty_setting_info t2 on t1.third_party_type = t2.type + join thirdparty_setting_info t2 on t1.third_party_type = t2.type where t1.del_flag = '0' and t1.station_id = #{stationId,jdbcType=BIGINT} diff --git a/pom.xml b/pom.xml index 6773e0f75..44a95cca9 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,7 @@ 4.39.79.ALL 2.2.3 3.1.87 + 1.8.0 @@ -323,6 +324,18 @@ ${huifu.version} + + io.protostuff + protostuff-core + ${protostuff.version} + + + + io.protostuff + protostuff-runtime + ${protostuff.version} + +
- * 协议开始的标准head_data,int类型,占据1个字节. - * 表示数据的长度contentLength,int类型,占据1个字节. - *