From eed9a7bc2cda5170cc1479da4e1efc0ba393e208 Mon Sep 17 00:00:00 2001 From: Guoqs <123@jsowell.com> Date: Wed, 31 Jul 2024 16:48:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A5=E8=8E=B7=E5=8F=96=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jsowell-admin/pom.xml | 21 +++++ .../test/java/SpringBootTestController.java | 2 - .../test/java/demo/ChargingPileClient.java | 80 ------------------- .../java/demo/ChargingPileController.java | 34 -------- .../test/java/demo/ChargingPileServer.java | 50 ------------ .../java/demo/ChargingPileServerHandler.java | 31 ------- .../src/test/java/demo/ProtocolUtil.java | 77 ------------------ .../java/rpc/ClientChannelInitializer.java | 19 +++++ jsowell-admin/src/test/java/rpc/Message.java | 11 +++ .../src/test/java/rpc/MessageConstant.java | 22 +++++ .../src/test/java/rpc/MessageDecode.java | 44 ++++++++++ .../src/test/java/rpc/MessageEncode.java | 22 +++++ .../src/test/java/rpc/RpcClient.java | 52 ++++++++++++ .../src/test/java/rpc/RpcRequest.java | 21 +++++ .../src/test/java/rpc/RpcRequestHandler.java | 36 +++++++++ .../src/test/java/rpc/RpcResponse.java | 18 +++++ .../src/test/java/rpc/RpcResponseHandler.java | 23 ++++++ .../src/test/java/rpc/RpcServer.java | 55 +++++++++++++ jsowell-admin/src/test/java/rpc/RpcUtil.java | 63 +++++++++++++++ .../src/test/java/rpc/SerializationUtil.java | 53 ++++++++++++ .../java/rpc/ServerChannelInitializer.java | 19 +++++ .../src/test/java/rpc/SyncPromise.java | 49 ++++++++++++ .../src/test/java/rpc/TestRpcClient.java | 48 +++++++++++ .../src/test/java/rpc/TestRpcServer.java | 7 ++ .../yunkuaichong/NettyServerHandler.java | 9 ++- 25 files changed, 590 insertions(+), 276 deletions(-) delete mode 100644 jsowell-admin/src/test/java/demo/ChargingPileClient.java delete mode 100644 jsowell-admin/src/test/java/demo/ChargingPileController.java delete mode 100644 jsowell-admin/src/test/java/demo/ChargingPileServer.java delete mode 100644 jsowell-admin/src/test/java/demo/ChargingPileServerHandler.java delete mode 100644 jsowell-admin/src/test/java/demo/ProtocolUtil.java create mode 100644 jsowell-admin/src/test/java/rpc/ClientChannelInitializer.java create mode 100644 jsowell-admin/src/test/java/rpc/Message.java create mode 100644 jsowell-admin/src/test/java/rpc/MessageConstant.java create mode 100644 jsowell-admin/src/test/java/rpc/MessageDecode.java create mode 100644 jsowell-admin/src/test/java/rpc/MessageEncode.java create mode 100644 jsowell-admin/src/test/java/rpc/RpcClient.java create mode 100644 jsowell-admin/src/test/java/rpc/RpcRequest.java create mode 100644 jsowell-admin/src/test/java/rpc/RpcRequestHandler.java create mode 100644 jsowell-admin/src/test/java/rpc/RpcResponse.java create mode 100644 jsowell-admin/src/test/java/rpc/RpcResponseHandler.java create mode 100644 jsowell-admin/src/test/java/rpc/RpcServer.java create mode 100644 jsowell-admin/src/test/java/rpc/RpcUtil.java create mode 100644 jsowell-admin/src/test/java/rpc/SerializationUtil.java create mode 100644 jsowell-admin/src/test/java/rpc/ServerChannelInitializer.java create mode 100644 jsowell-admin/src/test/java/rpc/SyncPromise.java create mode 100644 jsowell-admin/src/test/java/rpc/TestRpcClient.java create mode 100644 jsowell-admin/src/test/java/rpc/TestRpcServer.java diff --git a/jsowell-admin/pom.xml b/jsowell-admin/pom.xml index 8cf2bb63c..858c2d94b 100644 --- a/jsowell-admin/pom.xml +++ b/jsowell-admin/pom.xml @@ -107,6 +107,27 @@ jsowell-thirdparty + + org.projectlombok + lombok + test + + + + io.protostuff + protostuff-core + 1.8.0 + test + + + + io.protostuff + protostuff-runtime + 1.8.0 + test + + + diff --git a/jsowell-admin/src/test/java/SpringBootTestController.java b/jsowell-admin/src/test/java/SpringBootTestController.java index f00bcc8e1..cad30e446 100644 --- a/jsowell-admin/src/test/java/SpringBootTestController.java +++ b/jsowell-admin/src/test/java/SpringBootTestController.java @@ -85,8 +85,6 @@ import com.jsowell.wxpay.common.WeChatPayParameter; import com.jsowell.wxpay.dto.AppletTemplateMessageSendDTO; import com.jsowell.wxpay.response.WechatPayRefundRequest; import com.jsowell.wxpay.service.WxAppletRemoteService; -import demo.ChargingPileServerHandler; -import demo.ProtocolUtil; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Test; diff --git a/jsowell-admin/src/test/java/demo/ChargingPileClient.java b/jsowell-admin/src/test/java/demo/ChargingPileClient.java deleted file mode 100644 index 29f2ee4c2..000000000 --- a/jsowell-admin/src/test/java/demo/ChargingPileClient.java +++ /dev/null @@ -1,80 +0,0 @@ -package demo; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; -import io.netty.handler.codec.bytes.ByteArrayEncoder; - -public class ChargingPileClient { - private final String host; - private final int port; - - public ChargingPileClient(String host, int port) { - this.host = host; - this.port = port; - } - - public void start() throws Exception { - EventLoopGroup group = new NioEventLoopGroup(); - try { - Bootstrap b = new Bootstrap(); - b.group(group) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new ByteArrayDecoder()); - pipeline.addLast(new ByteArrayEncoder()); - pipeline.addLast(new ChargingPileClientHandler()); - } - }); - - ChannelFuture f = b.connect(host, port).sync(); - System.out.println("充电桩客户端已连接到服务器"); - f.channel().closeFuture().sync(); - } finally { - group.shutdownGracefully(); - } - } - - public static void main(String[] args) throws Exception { - String host = "localhost"; - int port = 9011; - new ChargingPileClient(host, port).start(); - } -} - -class ChargingPileClientHandler extends ChannelInboundHandlerAdapter { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - byte[] command = (byte[]) msg; - System.out.println("收到服务器指令: " + ProtocolUtil.bytesToHex(command)); - - // 处理工作参数设置指令 - if (command[5] == 0x52) { - String pileId = ProtocolUtil.bytesToHex(command).substring(12, 26); - boolean allowWork = (command[19] == 0x00); - int maxPower = command[20] & 0xFF; - - System.out.println("收到工作参数设置:"); - System.out.println("桩编号: " + pileId); - System.out.println("是否允许工作: " + (allowWork ? "允许" : "不允许")); - System.out.println("最大允许输出功率: " + maxPower + "%"); - - // 模拟设置成功 - byte[] response = ProtocolUtil.createSetWorkParamsResponseFrame(pileId, true); - ctx.writeAndFlush(response); - System.out.println("发送响应: " + ProtocolUtil.bytesToHex(response)); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); - ctx.close(); - } -} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/demo/ChargingPileController.java b/jsowell-admin/src/test/java/demo/ChargingPileController.java deleted file mode 100644 index f29477b15..000000000 --- a/jsowell-admin/src/test/java/demo/ChargingPileController.java +++ /dev/null @@ -1,34 +0,0 @@ -package demo; - -import org.springframework.web.bind.annotation.*; -import io.netty.channel.ChannelHandlerContext; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -@RestController -@RequestMapping("/api/charging") -public class ChargingPileController { - - private final ChannelHandlerContext chargingPileChannel; - - public ChargingPileController(ChannelHandlerContext chargingPileChannel) { - this.chargingPileChannel = chargingPileChannel; - } - - @PostMapping("/setWorkParams") - public String setWorkParams(@RequestParam String pileId, - @RequestParam boolean allowWork, - @RequestParam int maxPower) { - try { - byte[] command = ProtocolUtil.createSetWorkParamsFrame(pileId, allowWork, maxPower); - CompletableFuture future = ChargingPileServerHandler.sendCommand(chargingPileChannel, command); - byte[] response = future.get(5, TimeUnit.SECONDS); - - // 解析响应 - boolean success = (response[18] == 0x01); - return "设置" + (success ? "成功" : "失败"); - } catch (Exception e) { - return "发送指令失败: " + e.getMessage(); - } - } -} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/demo/ChargingPileServer.java b/jsowell-admin/src/test/java/demo/ChargingPileServer.java deleted file mode 100644 index 0ddc9d1b1..000000000 --- a/jsowell-admin/src/test/java/demo/ChargingPileServer.java +++ /dev/null @@ -1,50 +0,0 @@ -package demo; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.*; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; -import io.netty.handler.codec.bytes.ByteArrayEncoder; - -public class ChargingPileServer { - private final int port; - - public ChargingPileServer(int port) { - this.port = port; - } - - public void start() throws Exception { - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - EventLoopGroup workerGroup = new NioEventLoopGroup(); - try { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new ByteArrayDecoder()); - pipeline.addLast(new ByteArrayEncoder()); - pipeline.addLast(new ChargingPileServerHandler()); - } - }) - .option(ChannelOption.SO_BACKLOG, 128) - .childOption(ChannelOption.SO_KEEPALIVE, true); - - ChannelFuture f = b.bind(port).sync(); - System.out.println("充电桩服务器启动,监听端口: " + port); - f.channel().closeFuture().sync(); - } finally { - workerGroup.shutdownGracefully(); - bossGroup.shutdownGracefully(); - } - } - - public static void main(String[] args) throws Exception { - int port = 9011; - new ChargingPileServer(port).start(); - } -} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/demo/ChargingPileServerHandler.java b/jsowell-admin/src/test/java/demo/ChargingPileServerHandler.java deleted file mode 100644 index 4981ca69d..000000000 --- a/jsowell-admin/src/test/java/demo/ChargingPileServerHandler.java +++ /dev/null @@ -1,31 +0,0 @@ -package demo; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import java.util.concurrent.CompletableFuture; - -public class ChargingPileServerHandler extends ChannelInboundHandlerAdapter { - private static CompletableFuture responseFuture; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - byte[] response = (byte[]) msg; - System.out.println("接收到充电桩响应: " + ProtocolUtil.bytesToHex(response)); - if (responseFuture != null) { - responseFuture.complete(response); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); - ctx.close(); - } - - public static CompletableFuture sendCommand(ChannelHandlerContext ctx, byte[] command) { - responseFuture = new CompletableFuture<>(); - ctx.writeAndFlush(command); - System.out.println("发送指令到充电桩: " + ProtocolUtil.bytesToHex(command)); - return responseFuture; - } -} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/demo/ProtocolUtil.java b/jsowell-admin/src/test/java/demo/ProtocolUtil.java deleted file mode 100644 index ba1774c8c..000000000 --- a/jsowell-admin/src/test/java/demo/ProtocolUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -package demo; - -import java.nio.ByteBuffer; - -public class ProtocolUtil { - - public static byte[] createSetWorkParamsFrame(String pileId, boolean allowWork, int maxPower) { - ByteBuffer buffer = ByteBuffer.allocate(21); - buffer.put((byte) 0x68); // 起始标志 - buffer.put((byte) 0x0D); // 数据长度 - buffer.putShort((short) 0x0008); // 序列号域 - buffer.put((byte) 0x00); // 加密标志 - buffer.put((byte) 0x52); // 帧类型码 - - // 桩编码 - buffer.put(hexStringToByteArray(pileId)); - - // 是否允许工作 - buffer.put((byte) (allowWork ? 0x00 : 0x01)); - - // 充电桩最大允许输出功率 - buffer.put((byte) maxPower); - - // 计算校验和 - short checksum = calculateChecksum(buffer.array(), 1, 19); - buffer.putShort(checksum); - - return buffer.array(); - } - - public static byte[] createSetWorkParamsResponseFrame(String pileId, boolean success) { - ByteBuffer buffer = ByteBuffer.allocate(20); - buffer.put((byte) 0x68); // 起始标志 - buffer.put((byte) 0x0C); // 数据长度 - buffer.putShort((short) 0x0008); // 序列号域 - buffer.put((byte) 0x00); // 加密标志 - buffer.put((byte) 0x51); // 帧类型码 - - // 桩编码 - buffer.put(hexStringToByteArray(pileId)); - - // 设置结果 - buffer.put((byte) (success ? 0x01 : 0x00)); - - // 计算校验和 - short checksum = calculateChecksum(buffer.array(), 1, 18); - buffer.putShort(checksum); - - return buffer.array(); - } - - public static short calculateChecksum(byte[] data, int offset, int length) { - int sum = 0; - for (int i = offset; i < offset + length; i++) { - sum += (data[i] & 0xFF); - } - return (short) (sum & 0xFFFF); - } - - public static byte[] hexStringToByteArray(String s) { - int len = s.length(); - byte[] data = new byte[len / 2]; - for (int i = 0; i < len; i += 2) { - data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) - + Character.digit(s.charAt(i+1), 16)); - } - return data; - } - - public static String bytesToHex(byte[] bytes) { - StringBuilder sb = new StringBuilder(); - for (byte b : bytes) { - sb.append(String.format("%02X ", b)); - } - return sb.toString().trim(); - } -} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/ClientChannelInitializer.java b/jsowell-admin/src/test/java/rpc/ClientChannelInitializer.java new file mode 100644 index 000000000..d086c5383 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/ClientChannelInitializer.java @@ -0,0 +1,19 @@ +package rpc; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; + +public class ClientChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + + pipeline.addLast(new MessageEncode()); + pipeline.addLast(new MessageDecode()); + + pipeline.addLast(new RpcResponseHandler()); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/Message.java b/jsowell-admin/src/test/java/rpc/Message.java new file mode 100644 index 000000000..3323b79ea --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/Message.java @@ -0,0 +1,11 @@ +package rpc; + +import lombok.Data; + +@Data +public abstract class Message { + + protected Byte messageType; + + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/MessageConstant.java b/jsowell-admin/src/test/java/rpc/MessageConstant.java new file mode 100644 index 000000000..f3f78453b --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/MessageConstant.java @@ -0,0 +1,22 @@ +package rpc; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class MessageConstant { + + public final static Byte rpcRequest = 1; + public final static Byte rpcResponse = 2; + + public static Map> messageTypeMap = new ConcurrentHashMap<>(); + + static { + messageTypeMap.put(rpcRequest, RpcRequest.class); + messageTypeMap.put(rpcResponse, RpcResponse.class); + } + + public static Class getMessageClass(Byte messageType){ + return messageTypeMap.get(messageType); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/MessageDecode.java b/jsowell-admin/src/test/java/rpc/MessageDecode.java new file mode 100644 index 000000000..029a74178 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/MessageDecode.java @@ -0,0 +1,44 @@ +package rpc; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class MessageDecode extends ByteToMessageDecoder { + + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + + // 由于数据包的前4个字节用于记录总数据大小,如果数据不够4个字节,不进行读 + if(byteBuf.readableBytes() < 4) { + return; + } + + // 标记开始读的位置 + byteBuf.markReaderIndex(); + + // 前四个字节记录了数据大小 + int dataSize = byteBuf.readInt(); + + // 查看剩余可读字节是否足够,如果不是,重置读取位置,等待下一次解析 + if(byteBuf.readableBytes() < dataSize) { + byteBuf.resetReaderIndex(); + return; + } + + // 读取消息类型 + byte messageType = byteBuf.readByte(); + // 读取数据, 数组大小需要剔除1个字节的消息类型 + byte[] data = new byte[dataSize -1]; + + byteBuf.readBytes(data); + + Message message = SerializationUtil.deserialize(MessageConstant.getMessageClass(messageType), data); + + list.add(message); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/MessageEncode.java b/jsowell-admin/src/test/java/rpc/MessageEncode.java new file mode 100644 index 000000000..1e1610dc7 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/MessageEncode.java @@ -0,0 +1,22 @@ +package rpc; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +public class MessageEncode extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception { + // 将对象进行序列化 + byte[] data = SerializationUtil.serialize(message); + + // 写数据长度,前4个字节用于记录数据总长度(对象 + 类型(1个字节)) + byteBuf.writeInt(data.length + 1); + // 写记录消息类型,用于反序列选择类的类型 + byteBuf.writeByte(message.getMessageType()); + // 写对象 + byteBuf.writeBytes(data); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcClient.java b/jsowell-admin/src/test/java/rpc/RpcClient.java new file mode 100644 index 000000000..92b615f73 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcClient.java @@ -0,0 +1,52 @@ +package rpc; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + +public class RpcClient { + + public Channel connect(String host, Integer port) { + EventLoopGroup worker = new NioEventLoopGroup(); + Channel channel = null; + + try { + Bootstrap bootstrap = new Bootstrap(); + + bootstrap.group(worker) + .channel(NioSocketChannel.class) + .option(ChannelOption.AUTO_READ, true) + .handler(new ClientChannelInitializer()); + + ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); + + System.out.println("客户端启动"); + + channel = channelFuture.channel(); + + // 添加关闭监听器 + channel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + System.out.println("关闭客户端"); + worker.shutdownGracefully(); + } + }); + + } catch (Exception e) { + e.printStackTrace(); + + if(channel == null || !channel.isActive()) { + worker.shutdownGracefully(); + } else { + channel.close(); + } + + } + + return channel; + } + + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcRequest.java b/jsowell-admin/src/test/java/rpc/RpcRequest.java new file mode 100644 index 000000000..f55e85b71 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcRequest.java @@ -0,0 +1,21 @@ +package rpc; + +import lombok.Data; +import lombok.ToString; + +import java.util.UUID; + +@Data +@ToString +public class RpcRequest extends Message{ + + private String id; + + private String param; + + public RpcRequest() { + this.id = UUID.randomUUID().toString(); + super.messageType = MessageConstant.rpcRequest; + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcRequestHandler.java b/jsowell-admin/src/test/java/rpc/RpcRequestHandler.java new file mode 100644 index 000000000..74b432bca --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcRequestHandler.java @@ -0,0 +1,36 @@ +package rpc; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; + +public class RpcRequestHandler extends SimpleChannelInboundHandler { + + private final static EventLoopGroup worker = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1); + + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception { + + // 为避免占用网络io,此处异步进行处理 + worker.submit(() -> { + System.out.println("[RpcRequestHandler] "+ Thread.currentThread().getName() +" 处理请求,msg: " + msg); + + // 模拟处理耗时 + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + RpcResponse rpcResponse = new RpcResponse(); + rpcResponse.setId(msg.getId()); + rpcResponse.setResult("处理" + msg.getParam()); + + ctx.writeAndFlush(rpcResponse); + }); + + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcResponse.java b/jsowell-admin/src/test/java/rpc/RpcResponse.java new file mode 100644 index 000000000..43e5a7ad8 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcResponse.java @@ -0,0 +1,18 @@ +package rpc; + +import lombok.Data; +import lombok.ToString; + +@Data +@ToString +public class RpcResponse extends Message{ + + private String id; + + private String result; + + public RpcResponse() { + super.messageType = MessageConstant.rpcResponse; + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcResponseHandler.java b/jsowell-admin/src/test/java/rpc/RpcResponseHandler.java new file mode 100644 index 000000000..085c5bf31 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcResponseHandler.java @@ -0,0 +1,23 @@ +package rpc; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +public class RpcResponseHandler extends SimpleChannelInboundHandler { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception { + // 根据请求id,在集合中找到与外部线程通信的SyncPromise对象 + SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msg.getId()); + + if(syncPromise != null) { + // 设置响应结果 + syncPromise.setRpcResponse(msg); + + // 唤醒外部线程 + syncPromise.wake(); + } + + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcServer.java b/jsowell-admin/src/test/java/rpc/RpcServer.java new file mode 100644 index 000000000..a1b55da49 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcServer.java @@ -0,0 +1,55 @@ +package rpc; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +public class RpcServer { + + public void bind(Integer port) { + + EventLoopGroup parent = new NioEventLoopGroup(); + EventLoopGroup child = new NioEventLoopGroup(); + Channel channel = null; + + try{ + ServerBootstrap serverBootstrap = new ServerBootstrap(); + + serverBootstrap.group(parent, child) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024) + .childHandler(new ServerChannelInitializer()); + + ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); + + System.out.println("server启动"); + + // 非阻塞等待关闭 + channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + System.out.println("server关闭"); + parent.shutdownGracefully(); + child.shutdownGracefully(); + } + }); + + channel = channelFuture.channel(); + + } catch (Exception e) { + + e.printStackTrace(); + + if(channel == null || !channel.isActive()) { + System.out.println("server关闭"); + parent.shutdownGracefully(); + child.shutdownGracefully(); + } else { + channel.close(); + } + } + } + + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/RpcUtil.java b/jsowell-admin/src/test/java/rpc/RpcUtil.java new file mode 100644 index 000000000..5d8c4bf89 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/RpcUtil.java @@ -0,0 +1,63 @@ +package rpc; + +import io.netty.channel.Channel; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class RpcUtil { + + private final static Map syncPromiseMap = new ConcurrentHashMap<>(); + + private final static Channel channel; + + static{ + channel = new RpcClient().connect("127.0.0.1", 8888); + } + + public static RpcResponse send(RpcRequest rpcRequest, long timeout, TimeUnit unit) throws Exception{ + + if(channel == null) { + throw new NullPointerException("channel"); + } + + if(rpcRequest == null) { + throw new NullPointerException("rpcRequest"); + } + + if(timeout <= 0) { + throw new IllegalArgumentException("timeout must greater than 0"); + } + + // 创造一个容器,用于存放当前线程与rpcClient中的线程交互 + SyncPromise syncPromise = new SyncPromise(); + syncPromiseMap.put(rpcRequest.getId(), syncPromise); + + // 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果 + channel.writeAndFlush(rpcRequest); + + // 等待获取结果 + RpcResponse rpcResponse = syncPromise.get(timeout, unit); + + if(rpcResponse == null) { + if(syncPromise.isTimeout()) { + throw new TimeoutException("等待响应结果超时"); + } else{ + throw new Exception("其他异常"); + } + } + + // 移除容器 + syncPromiseMap.remove(rpcRequest.getId()); + + return rpcResponse; + } + + public static Map getSyncPromiseMap(){ + return syncPromiseMap; + } + + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/SerializationUtil.java b/jsowell-admin/src/test/java/rpc/SerializationUtil.java new file mode 100644 index 000000000..a658b59a5 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/SerializationUtil.java @@ -0,0 +1,53 @@ +package rpc; + +import io.protostuff.LinkedBuffer; +import io.protostuff.ProtostuffIOUtil; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SerializationUtil { + + private final static Map, Schema> schemaCache = new ConcurrentHashMap<>(); + + /** + * 序列化 + */ + public static byte[] serialize(T object){ + LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); + + try { + Class cls = (Class) object.getClass(); + Schema schema = getSchema(cls); + + return ProtostuffIOUtil.toByteArray(object, schema, buffer); + } catch (Exception e) { + throw e; + } finally { + buffer.clear(); + } + } + + /** + * 反序列化 + */ + public static T deserialize(Class cls, byte[] data) { + Schema schema = getSchema(cls); + T message = schema.newMessage(); + ProtostuffIOUtil.mergeFrom(data, message, schema); + return message; + } + + public static Schema getSchema(Class cls) { + Schema schema = (Schema) schemaCache.get(cls); + + if(schema == null) { + schema = RuntimeSchema.getSchema(cls); + schemaCache.put(cls, schema); + } + return schema; + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/ServerChannelInitializer.java b/jsowell-admin/src/test/java/rpc/ServerChannelInitializer.java new file mode 100644 index 000000000..283d00b5d --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/ServerChannelInitializer.java @@ -0,0 +1,19 @@ +package rpc; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; + +public class ServerChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + + pipeline.addLast(new MessageEncode()); + pipeline.addLast(new MessageDecode()); + + pipeline.addLast(new RpcRequestHandler()); + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/SyncPromise.java b/jsowell-admin/src/test/java/rpc/SyncPromise.java new file mode 100644 index 000000000..4bcdb13fa --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/SyncPromise.java @@ -0,0 +1,49 @@ +package rpc; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class SyncPromise { + + // 用于接收结果 + private RpcResponse rpcResponse; + + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + // 用于判断是否超时 + private boolean isTimeout = false; + + /** + * 同步等待返回结果 + */ + public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException { + // 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断 + boolean earlyWakeUp = countDownLatch.await(timeout, unit); + + if(earlyWakeUp) { + // 超时时间内countDownLatch减到0,提前唤醒,说明已有结果 + return rpcResponse; + } else { + // 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果 + isTimeout = true; + return null; + } + } + + public void wake() { + countDownLatch.countDown(); + } + + public RpcResponse getRpcResponse() { + return rpcResponse; + } + + public void setRpcResponse(RpcResponse rpcResponse) { + this.rpcResponse = rpcResponse; + } + + public boolean isTimeout() { + return isTimeout; + } + +} \ No newline at end of file diff --git a/jsowell-admin/src/test/java/rpc/TestRpcClient.java b/jsowell-admin/src/test/java/rpc/TestRpcClient.java new file mode 100644 index 000000000..aeba06cbb --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/TestRpcClient.java @@ -0,0 +1,48 @@ +package rpc; + +import java.util.concurrent.TimeUnit; + +public class TestRpcClient { + public static void main(String[] args) throws Exception{ + //Channel channel = new RpcClient().connect("127.0.0.1", 8888); + + Thread thread1 = new Thread(new Runnable() { + @Override + public void run() { + RpcRequest rpcRequest = new RpcRequest(); + rpcRequest.setParam("参数1"); + + try { + System.out.println("thread1发送请求"); + RpcResponse rpcResponse = RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS); + System.out.println("thread1处理结果:" + rpcResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + Thread thread2 = new Thread(new Runnable() { + @Override + public void run() { + RpcRequest rpcRequest2 = new RpcRequest(); + rpcRequest2.setParam("参数2"); + + try { + System.out.println("thread2发送请求"); + RpcResponse rpcResponse = RpcUtil.send(rpcRequest2, 2, TimeUnit.SECONDS); + System.out.println("thread2处理结果:" + rpcResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + // 休眠一下,等待客户端与服务端进行连接 + Thread.sleep(1000); + + thread1.start(); + thread2.start(); + + } +} diff --git a/jsowell-admin/src/test/java/rpc/TestRpcServer.java b/jsowell-admin/src/test/java/rpc/TestRpcServer.java new file mode 100644 index 000000000..c15d9e211 --- /dev/null +++ b/jsowell-admin/src/test/java/rpc/TestRpcServer.java @@ -0,0 +1,7 @@ +package rpc; + +public class TestRpcServer { + public static void main(String[] args) { + new RpcServer().bind(8888); + } +} diff --git a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java index e0621c246..35c668f62 100644 --- a/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java +++ b/jsowell-netty/src/main/java/com/jsowell/netty/server/yunkuaichong/NettyServerHandler.java @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; @ChannelHandler.Sharable @Slf4j @Component -public class NettyServerHandler extends ChannelInboundHandlerAdapter { +public class NettyServerHandler extends SimpleChannelInboundHandler { @Autowired private YKCBusinessService ykcService; @@ -66,7 +66,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { - // log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg); + log.info("加载客户端报文channelRead=== channelId:" + ctx.channel().id() + ", msg:" + message); // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 byte[] msg = (byte[]) message; @@ -109,6 +109,11 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } } + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("channelRead0=== channelId:" + ctx.channel().id() + ", msg:" + msg); + } + /** * 有客户端终止连接服务器会触发此函数 */