mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-04-20 02:55:04 +08:00
同步获取响应数据
This commit is contained in:
@@ -107,6 +107,27 @@
|
|||||||
<artifactId>jsowell-thirdparty</artifactId>
|
<artifactId>jsowell-thirdparty</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.protostuff</groupId>
|
||||||
|
<artifactId>protostuff-core</artifactId>
|
||||||
|
<version>1.8.0</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.protostuff</groupId>
|
||||||
|
<artifactId>protostuff-runtime</artifactId>
|
||||||
|
<version>1.8.0</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -85,8 +85,6 @@ import com.jsowell.wxpay.common.WeChatPayParameter;
|
|||||||
import com.jsowell.wxpay.dto.AppletTemplateMessageSendDTO;
|
import com.jsowell.wxpay.dto.AppletTemplateMessageSendDTO;
|
||||||
import com.jsowell.wxpay.response.WechatPayRefundRequest;
|
import com.jsowell.wxpay.response.WechatPayRefundRequest;
|
||||||
import com.jsowell.wxpay.service.WxAppletRemoteService;
|
import com.jsowell.wxpay.service.WxAppletRemoteService;
|
||||||
import demo.ChargingPileServerHandler;
|
|
||||||
import demo.ProtocolUtil;
|
|
||||||
import org.apache.commons.collections4.CollectionUtils;
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|||||||
@@ -1,80 +0,0 @@
|
|||||||
package demo;
|
|
||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
|
||||||
import io.netty.channel.*;
|
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
|
||||||
import io.netty.channel.socket.SocketChannel;
|
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
|
||||||
import io.netty.handler.codec.bytes.ByteArrayDecoder;
|
|
||||||
import io.netty.handler.codec.bytes.ByteArrayEncoder;
|
|
||||||
|
|
||||||
public class ChargingPileClient {
|
|
||||||
private final String host;
|
|
||||||
private final int port;
|
|
||||||
|
|
||||||
public ChargingPileClient(String host, int port) {
|
|
||||||
this.host = host;
|
|
||||||
this.port = port;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start() throws Exception {
|
|
||||||
EventLoopGroup group = new NioEventLoopGroup();
|
|
||||||
try {
|
|
||||||
Bootstrap b = new Bootstrap();
|
|
||||||
b.group(group)
|
|
||||||
.channel(NioSocketChannel.class)
|
|
||||||
.handler(new ChannelInitializer<SocketChannel>() {
|
|
||||||
@Override
|
|
||||||
public void initChannel(SocketChannel ch) throws Exception {
|
|
||||||
ChannelPipeline pipeline = ch.pipeline();
|
|
||||||
pipeline.addLast(new ByteArrayDecoder());
|
|
||||||
pipeline.addLast(new ByteArrayEncoder());
|
|
||||||
pipeline.addLast(new ChargingPileClientHandler());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
ChannelFuture f = b.connect(host, port).sync();
|
|
||||||
System.out.println("充电桩客户端已连接到服务器");
|
|
||||||
f.channel().closeFuture().sync();
|
|
||||||
} finally {
|
|
||||||
group.shutdownGracefully();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
String host = "localhost";
|
|
||||||
int port = 9011;
|
|
||||||
new ChargingPileClient(host, port).start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class ChargingPileClientHandler extends ChannelInboundHandlerAdapter {
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
|
||||||
byte[] command = (byte[]) msg;
|
|
||||||
System.out.println("收到服务器指令: " + ProtocolUtil.bytesToHex(command));
|
|
||||||
|
|
||||||
// 处理工作参数设置指令
|
|
||||||
if (command[5] == 0x52) {
|
|
||||||
String pileId = ProtocolUtil.bytesToHex(command).substring(12, 26);
|
|
||||||
boolean allowWork = (command[19] == 0x00);
|
|
||||||
int maxPower = command[20] & 0xFF;
|
|
||||||
|
|
||||||
System.out.println("收到工作参数设置:");
|
|
||||||
System.out.println("桩编号: " + pileId);
|
|
||||||
System.out.println("是否允许工作: " + (allowWork ? "允许" : "不允许"));
|
|
||||||
System.out.println("最大允许输出功率: " + maxPower + "%");
|
|
||||||
|
|
||||||
// 模拟设置成功
|
|
||||||
byte[] response = ProtocolUtil.createSetWorkParamsResponseFrame(pileId, true);
|
|
||||||
ctx.writeAndFlush(response);
|
|
||||||
System.out.println("发送响应: " + ProtocolUtil.bytesToHex(response));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|
||||||
cause.printStackTrace();
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,34 +0,0 @@
|
|||||||
package demo;
|
|
||||||
|
|
||||||
import org.springframework.web.bind.annotation.*;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
@RestController
|
|
||||||
@RequestMapping("/api/charging")
|
|
||||||
public class ChargingPileController {
|
|
||||||
|
|
||||||
private final ChannelHandlerContext chargingPileChannel;
|
|
||||||
|
|
||||||
public ChargingPileController(ChannelHandlerContext chargingPileChannel) {
|
|
||||||
this.chargingPileChannel = chargingPileChannel;
|
|
||||||
}
|
|
||||||
|
|
||||||
@PostMapping("/setWorkParams")
|
|
||||||
public String setWorkParams(@RequestParam String pileId,
|
|
||||||
@RequestParam boolean allowWork,
|
|
||||||
@RequestParam int maxPower) {
|
|
||||||
try {
|
|
||||||
byte[] command = ProtocolUtil.createSetWorkParamsFrame(pileId, allowWork, maxPower);
|
|
||||||
CompletableFuture<byte[]> future = ChargingPileServerHandler.sendCommand(chargingPileChannel, command);
|
|
||||||
byte[] response = future.get(5, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
// 解析响应
|
|
||||||
boolean success = (response[18] == 0x01);
|
|
||||||
return "设置" + (success ? "成功" : "失败");
|
|
||||||
} catch (Exception e) {
|
|
||||||
return "发送指令失败: " + e.getMessage();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,50 +0,0 @@
|
|||||||
package demo;
|
|
||||||
|
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
|
||||||
import io.netty.channel.*;
|
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
|
||||||
import io.netty.channel.socket.SocketChannel;
|
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
||||||
import io.netty.handler.codec.bytes.ByteArrayDecoder;
|
|
||||||
import io.netty.handler.codec.bytes.ByteArrayEncoder;
|
|
||||||
|
|
||||||
public class ChargingPileServer {
|
|
||||||
private final int port;
|
|
||||||
|
|
||||||
public ChargingPileServer(int port) {
|
|
||||||
this.port = port;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start() throws Exception {
|
|
||||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
|
||||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
|
||||||
try {
|
|
||||||
ServerBootstrap b = new ServerBootstrap();
|
|
||||||
b.group(bossGroup, workerGroup)
|
|
||||||
.channel(NioServerSocketChannel.class)
|
|
||||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
|
||||||
@Override
|
|
||||||
public void initChannel(SocketChannel ch) throws Exception {
|
|
||||||
ChannelPipeline pipeline = ch.pipeline();
|
|
||||||
pipeline.addLast(new ByteArrayDecoder());
|
|
||||||
pipeline.addLast(new ByteArrayEncoder());
|
|
||||||
pipeline.addLast(new ChargingPileServerHandler());
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.option(ChannelOption.SO_BACKLOG, 128)
|
|
||||||
.childOption(ChannelOption.SO_KEEPALIVE, true);
|
|
||||||
|
|
||||||
ChannelFuture f = b.bind(port).sync();
|
|
||||||
System.out.println("充电桩服务器启动,监听端口: " + port);
|
|
||||||
f.channel().closeFuture().sync();
|
|
||||||
} finally {
|
|
||||||
workerGroup.shutdownGracefully();
|
|
||||||
bossGroup.shutdownGracefully();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
int port = 9011;
|
|
||||||
new ChargingPileServer(port).start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,31 +0,0 @@
|
|||||||
package demo;
|
|
||||||
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
|
|
||||||
public class ChargingPileServerHandler extends ChannelInboundHandlerAdapter {
|
|
||||||
private static CompletableFuture<byte[]> responseFuture;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
|
||||||
byte[] response = (byte[]) msg;
|
|
||||||
System.out.println("接收到充电桩响应: " + ProtocolUtil.bytesToHex(response));
|
|
||||||
if (responseFuture != null) {
|
|
||||||
responseFuture.complete(response);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|
||||||
cause.printStackTrace();
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static CompletableFuture<byte[]> sendCommand(ChannelHandlerContext ctx, byte[] command) {
|
|
||||||
responseFuture = new CompletableFuture<>();
|
|
||||||
ctx.writeAndFlush(command);
|
|
||||||
System.out.println("发送指令到充电桩: " + ProtocolUtil.bytesToHex(command));
|
|
||||||
return responseFuture;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,77 +0,0 @@
|
|||||||
package demo;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
|
|
||||||
public class ProtocolUtil {
|
|
||||||
|
|
||||||
public static byte[] createSetWorkParamsFrame(String pileId, boolean allowWork, int maxPower) {
|
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(21);
|
|
||||||
buffer.put((byte) 0x68); // 起始标志
|
|
||||||
buffer.put((byte) 0x0D); // 数据长度
|
|
||||||
buffer.putShort((short) 0x0008); // 序列号域
|
|
||||||
buffer.put((byte) 0x00); // 加密标志
|
|
||||||
buffer.put((byte) 0x52); // 帧类型码
|
|
||||||
|
|
||||||
// 桩编码
|
|
||||||
buffer.put(hexStringToByteArray(pileId));
|
|
||||||
|
|
||||||
// 是否允许工作
|
|
||||||
buffer.put((byte) (allowWork ? 0x00 : 0x01));
|
|
||||||
|
|
||||||
// 充电桩最大允许输出功率
|
|
||||||
buffer.put((byte) maxPower);
|
|
||||||
|
|
||||||
// 计算校验和
|
|
||||||
short checksum = calculateChecksum(buffer.array(), 1, 19);
|
|
||||||
buffer.putShort(checksum);
|
|
||||||
|
|
||||||
return buffer.array();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static byte[] createSetWorkParamsResponseFrame(String pileId, boolean success) {
|
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(20);
|
|
||||||
buffer.put((byte) 0x68); // 起始标志
|
|
||||||
buffer.put((byte) 0x0C); // 数据长度
|
|
||||||
buffer.putShort((short) 0x0008); // 序列号域
|
|
||||||
buffer.put((byte) 0x00); // 加密标志
|
|
||||||
buffer.put((byte) 0x51); // 帧类型码
|
|
||||||
|
|
||||||
// 桩编码
|
|
||||||
buffer.put(hexStringToByteArray(pileId));
|
|
||||||
|
|
||||||
// 设置结果
|
|
||||||
buffer.put((byte) (success ? 0x01 : 0x00));
|
|
||||||
|
|
||||||
// 计算校验和
|
|
||||||
short checksum = calculateChecksum(buffer.array(), 1, 18);
|
|
||||||
buffer.putShort(checksum);
|
|
||||||
|
|
||||||
return buffer.array();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static short calculateChecksum(byte[] data, int offset, int length) {
|
|
||||||
int sum = 0;
|
|
||||||
for (int i = offset; i < offset + length; i++) {
|
|
||||||
sum += (data[i] & 0xFF);
|
|
||||||
}
|
|
||||||
return (short) (sum & 0xFFFF);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static byte[] hexStringToByteArray(String s) {
|
|
||||||
int len = s.length();
|
|
||||||
byte[] data = new byte[len / 2];
|
|
||||||
for (int i = 0; i < len; i += 2) {
|
|
||||||
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
|
|
||||||
+ Character.digit(s.charAt(i+1), 16));
|
|
||||||
}
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String bytesToHex(byte[] bytes) {
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
for (byte b : bytes) {
|
|
||||||
sb.append(String.format("%02X ", b));
|
|
||||||
}
|
|
||||||
return sb.toString().trim();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
|
||||||
|
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||||
|
ChannelPipeline pipeline = socketChannel.pipeline();
|
||||||
|
|
||||||
|
pipeline.addLast(new MessageEncode());
|
||||||
|
pipeline.addLast(new MessageDecode());
|
||||||
|
|
||||||
|
pipeline.addLast(new RpcResponseHandler());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
11
jsowell-admin/src/test/java/rpc/Message.java
Normal file
11
jsowell-admin/src/test/java/rpc/Message.java
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public abstract class Message {
|
||||||
|
|
||||||
|
protected Byte messageType;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
22
jsowell-admin/src/test/java/rpc/MessageConstant.java
Normal file
22
jsowell-admin/src/test/java/rpc/MessageConstant.java
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public class MessageConstant {
|
||||||
|
|
||||||
|
public final static Byte rpcRequest = 1;
|
||||||
|
public final static Byte rpcResponse = 2;
|
||||||
|
|
||||||
|
public static Map<Byte, Class<? extends Message>> messageTypeMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
static {
|
||||||
|
messageTypeMap.put(rpcRequest, RpcRequest.class);
|
||||||
|
messageTypeMap.put(rpcResponse, RpcResponse.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Class<? extends Message> getMessageClass(Byte messageType){
|
||||||
|
return messageTypeMap.get(messageType);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
44
jsowell-admin/src/test/java/rpc/MessageDecode.java
Normal file
44
jsowell-admin/src/test/java/rpc/MessageDecode.java
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class MessageDecode extends ByteToMessageDecoder {
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
|
||||||
|
|
||||||
|
// 由于数据包的前4个字节用于记录总数据大小,如果数据不够4个字节,不进行读
|
||||||
|
if(byteBuf.readableBytes() < 4) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 标记开始读的位置
|
||||||
|
byteBuf.markReaderIndex();
|
||||||
|
|
||||||
|
// 前四个字节记录了数据大小
|
||||||
|
int dataSize = byteBuf.readInt();
|
||||||
|
|
||||||
|
// 查看剩余可读字节是否足够,如果不是,重置读取位置,等待下一次解析
|
||||||
|
if(byteBuf.readableBytes() < dataSize) {
|
||||||
|
byteBuf.resetReaderIndex();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取消息类型
|
||||||
|
byte messageType = byteBuf.readByte();
|
||||||
|
// 读取数据, 数组大小需要剔除1个字节的消息类型
|
||||||
|
byte[] data = new byte[dataSize -1];
|
||||||
|
|
||||||
|
byteBuf.readBytes(data);
|
||||||
|
|
||||||
|
Message message = SerializationUtil.deserialize(MessageConstant.getMessageClass(messageType), data);
|
||||||
|
|
||||||
|
list.add(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
22
jsowell-admin/src/test/java/rpc/MessageEncode.java
Normal file
22
jsowell-admin/src/test/java/rpc/MessageEncode.java
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.handler.codec.MessageToByteEncoder;
|
||||||
|
|
||||||
|
public class MessageEncode extends MessageToByteEncoder<Message> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
|
||||||
|
// 将对象进行序列化
|
||||||
|
byte[] data = SerializationUtil.serialize(message);
|
||||||
|
|
||||||
|
// 写数据长度,前4个字节用于记录数据总长度(对象 + 类型(1个字节))
|
||||||
|
byteBuf.writeInt(data.length + 1);
|
||||||
|
// 写记录消息类型,用于反序列选择类的类型
|
||||||
|
byteBuf.writeByte(message.getMessageType());
|
||||||
|
// 写对象
|
||||||
|
byteBuf.writeBytes(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
52
jsowell-admin/src/test/java/rpc/RpcClient.java
Normal file
52
jsowell-admin/src/test/java/rpc/RpcClient.java
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.channel.*;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
|
||||||
|
public class RpcClient {
|
||||||
|
|
||||||
|
public Channel connect(String host, Integer port) {
|
||||||
|
EventLoopGroup worker = new NioEventLoopGroup();
|
||||||
|
Channel channel = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
Bootstrap bootstrap = new Bootstrap();
|
||||||
|
|
||||||
|
bootstrap.group(worker)
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.option(ChannelOption.AUTO_READ, true)
|
||||||
|
.handler(new ClientChannelInitializer());
|
||||||
|
|
||||||
|
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
|
||||||
|
|
||||||
|
System.out.println("客户端启动");
|
||||||
|
|
||||||
|
channel = channelFuture.channel();
|
||||||
|
|
||||||
|
// 添加关闭监听器
|
||||||
|
channel.closeFuture().addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||||
|
System.out.println("关闭客户端");
|
||||||
|
worker.shutdownGracefully();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
|
||||||
|
if(channel == null || !channel.isActive()) {
|
||||||
|
worker.shutdownGracefully();
|
||||||
|
} else {
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
21
jsowell-admin/src/test/java/rpc/RpcRequest.java
Normal file
21
jsowell-admin/src/test/java/rpc/RpcRequest.java
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.ToString;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@ToString
|
||||||
|
public class RpcRequest extends Message{
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
private String param;
|
||||||
|
|
||||||
|
public RpcRequest() {
|
||||||
|
this.id = UUID.randomUUID().toString();
|
||||||
|
super.messageType = MessageConstant.rpcRequest;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
36
jsowell-admin/src/test/java/rpc/RpcRequestHandler.java
Normal file
36
jsowell-admin/src/test/java/rpc/RpcRequestHandler.java
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.DefaultEventLoopGroup;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
|
||||||
|
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> {
|
||||||
|
|
||||||
|
private final static EventLoopGroup worker = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1);
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
|
||||||
|
|
||||||
|
// 为避免占用网络io,此处异步进行处理
|
||||||
|
worker.submit(() -> {
|
||||||
|
System.out.println("[RpcRequestHandler] "+ Thread.currentThread().getName() +" 处理请求,msg: " + msg);
|
||||||
|
|
||||||
|
// 模拟处理耗时
|
||||||
|
try {
|
||||||
|
Thread.sleep(3000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
RpcResponse rpcResponse = new RpcResponse();
|
||||||
|
rpcResponse.setId(msg.getId());
|
||||||
|
rpcResponse.setResult("处理" + msg.getParam());
|
||||||
|
|
||||||
|
ctx.writeAndFlush(rpcResponse);
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
18
jsowell-admin/src/test/java/rpc/RpcResponse.java
Normal file
18
jsowell-admin/src/test/java/rpc/RpcResponse.java
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.ToString;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@ToString
|
||||||
|
public class RpcResponse extends Message{
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
private String result;
|
||||||
|
|
||||||
|
public RpcResponse() {
|
||||||
|
super.messageType = MessageConstant.rpcResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
23
jsowell-admin/src/test/java/rpc/RpcResponseHandler.java
Normal file
23
jsowell-admin/src/test/java/rpc/RpcResponseHandler.java
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
|
||||||
|
public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcResponse> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
|
||||||
|
// 根据请求id,在集合中找到与外部线程通信的SyncPromise对象
|
||||||
|
SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msg.getId());
|
||||||
|
|
||||||
|
if(syncPromise != null) {
|
||||||
|
// 设置响应结果
|
||||||
|
syncPromise.setRpcResponse(msg);
|
||||||
|
|
||||||
|
// 唤醒外部线程
|
||||||
|
syncPromise.wake();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
55
jsowell-admin/src/test/java/rpc/RpcServer.java
Normal file
55
jsowell-admin/src/test/java/rpc/RpcServer.java
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.channel.*;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
|
|
||||||
|
public class RpcServer {
|
||||||
|
|
||||||
|
public void bind(Integer port) {
|
||||||
|
|
||||||
|
EventLoopGroup parent = new NioEventLoopGroup();
|
||||||
|
EventLoopGroup child = new NioEventLoopGroup();
|
||||||
|
Channel channel = null;
|
||||||
|
|
||||||
|
try{
|
||||||
|
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||||
|
|
||||||
|
serverBootstrap.group(parent, child)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.option(ChannelOption.SO_BACKLOG, 1024)
|
||||||
|
.childHandler(new ServerChannelInitializer());
|
||||||
|
|
||||||
|
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
|
||||||
|
|
||||||
|
System.out.println("server启动");
|
||||||
|
|
||||||
|
// 非阻塞等待关闭
|
||||||
|
channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||||
|
System.out.println("server关闭");
|
||||||
|
parent.shutdownGracefully();
|
||||||
|
child.shutdownGracefully();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
channel = channelFuture.channel();
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
|
||||||
|
e.printStackTrace();
|
||||||
|
|
||||||
|
if(channel == null || !channel.isActive()) {
|
||||||
|
System.out.println("server关闭");
|
||||||
|
parent.shutdownGracefully();
|
||||||
|
child.shutdownGracefully();
|
||||||
|
} else {
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
63
jsowell-admin/src/test/java/rpc/RpcUtil.java
Normal file
63
jsowell-admin/src/test/java/rpc/RpcUtil.java
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
public class RpcUtil {
|
||||||
|
|
||||||
|
private final static Map<String, SyncPromise> syncPromiseMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final static Channel channel;
|
||||||
|
|
||||||
|
static{
|
||||||
|
channel = new RpcClient().connect("127.0.0.1", 8888);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RpcResponse send(RpcRequest rpcRequest, long timeout, TimeUnit unit) throws Exception{
|
||||||
|
|
||||||
|
if(channel == null) {
|
||||||
|
throw new NullPointerException("channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
if(rpcRequest == null) {
|
||||||
|
throw new NullPointerException("rpcRequest");
|
||||||
|
}
|
||||||
|
|
||||||
|
if(timeout <= 0) {
|
||||||
|
throw new IllegalArgumentException("timeout must greater than 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创造一个容器,用于存放当前线程与rpcClient中的线程交互
|
||||||
|
SyncPromise syncPromise = new SyncPromise();
|
||||||
|
syncPromiseMap.put(rpcRequest.getId(), syncPromise);
|
||||||
|
|
||||||
|
// 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果
|
||||||
|
channel.writeAndFlush(rpcRequest);
|
||||||
|
|
||||||
|
// 等待获取结果
|
||||||
|
RpcResponse rpcResponse = syncPromise.get(timeout, unit);
|
||||||
|
|
||||||
|
if(rpcResponse == null) {
|
||||||
|
if(syncPromise.isTimeout()) {
|
||||||
|
throw new TimeoutException("等待响应结果超时");
|
||||||
|
} else{
|
||||||
|
throw new Exception("其他异常");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 移除容器
|
||||||
|
syncPromiseMap.remove(rpcRequest.getId());
|
||||||
|
|
||||||
|
return rpcResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Map<String, SyncPromise> getSyncPromiseMap(){
|
||||||
|
return syncPromiseMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
53
jsowell-admin/src/test/java/rpc/SerializationUtil.java
Normal file
53
jsowell-admin/src/test/java/rpc/SerializationUtil.java
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.protostuff.LinkedBuffer;
|
||||||
|
import io.protostuff.ProtostuffIOUtil;
|
||||||
|
import io.protostuff.Schema;
|
||||||
|
import io.protostuff.runtime.RuntimeSchema;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public class SerializationUtil {
|
||||||
|
|
||||||
|
private final static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 序列化
|
||||||
|
*/
|
||||||
|
public static <T> byte[] serialize(T object){
|
||||||
|
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Class<T> cls = (Class<T>) object.getClass();
|
||||||
|
Schema<T> schema = getSchema(cls);
|
||||||
|
|
||||||
|
return ProtostuffIOUtil.toByteArray(object, schema, buffer);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
buffer.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 反序列化
|
||||||
|
*/
|
||||||
|
public static <T> T deserialize(Class<T> cls, byte[] data) {
|
||||||
|
Schema<T> schema = getSchema(cls);
|
||||||
|
T message = schema.newMessage();
|
||||||
|
ProtostuffIOUtil.mergeFrom(data, message, schema);
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> Schema<T> getSchema(Class<T> cls) {
|
||||||
|
Schema<T> schema = (Schema<T>) schemaCache.get(cls);
|
||||||
|
|
||||||
|
if(schema == null) {
|
||||||
|
schema = RuntimeSchema.getSchema(cls);
|
||||||
|
schemaCache.put(cls, schema);
|
||||||
|
}
|
||||||
|
return schema;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
|
||||||
|
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||||
|
ChannelPipeline pipeline = socketChannel.pipeline();
|
||||||
|
|
||||||
|
pipeline.addLast(new MessageEncode());
|
||||||
|
pipeline.addLast(new MessageDecode());
|
||||||
|
|
||||||
|
pipeline.addLast(new RpcRequestHandler());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
49
jsowell-admin/src/test/java/rpc/SyncPromise.java
Normal file
49
jsowell-admin/src/test/java/rpc/SyncPromise.java
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class SyncPromise {
|
||||||
|
|
||||||
|
// 用于接收结果
|
||||||
|
private RpcResponse rpcResponse;
|
||||||
|
|
||||||
|
private final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
// 用于判断是否超时
|
||||||
|
private boolean isTimeout = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 同步等待返回结果
|
||||||
|
*/
|
||||||
|
public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
|
// 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断
|
||||||
|
boolean earlyWakeUp = countDownLatch.await(timeout, unit);
|
||||||
|
|
||||||
|
if(earlyWakeUp) {
|
||||||
|
// 超时时间内countDownLatch减到0,提前唤醒,说明已有结果
|
||||||
|
return rpcResponse;
|
||||||
|
} else {
|
||||||
|
// 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果
|
||||||
|
isTimeout = true;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void wake() {
|
||||||
|
countDownLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public RpcResponse getRpcResponse() {
|
||||||
|
return rpcResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRpcResponse(RpcResponse rpcResponse) {
|
||||||
|
this.rpcResponse = rpcResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTimeout() {
|
||||||
|
return isTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
48
jsowell-admin/src/test/java/rpc/TestRpcClient.java
Normal file
48
jsowell-admin/src/test/java/rpc/TestRpcClient.java
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class TestRpcClient {
|
||||||
|
public static void main(String[] args) throws Exception{
|
||||||
|
//Channel channel = new RpcClient().connect("127.0.0.1", 8888);
|
||||||
|
|
||||||
|
Thread thread1 = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
RpcRequest rpcRequest = new RpcRequest();
|
||||||
|
rpcRequest.setParam("参数1");
|
||||||
|
|
||||||
|
try {
|
||||||
|
System.out.println("thread1发送请求");
|
||||||
|
RpcResponse rpcResponse = RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS);
|
||||||
|
System.out.println("thread1处理结果:" + rpcResponse);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Thread thread2 = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
RpcRequest rpcRequest2 = new RpcRequest();
|
||||||
|
rpcRequest2.setParam("参数2");
|
||||||
|
|
||||||
|
try {
|
||||||
|
System.out.println("thread2发送请求");
|
||||||
|
RpcResponse rpcResponse = RpcUtil.send(rpcRequest2, 2, TimeUnit.SECONDS);
|
||||||
|
System.out.println("thread2处理结果:" + rpcResponse);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 休眠一下,等待客户端与服务端进行连接
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
thread1.start();
|
||||||
|
thread2.start();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
7
jsowell-admin/src/test/java/rpc/TestRpcServer.java
Normal file
7
jsowell-admin/src/test/java/rpc/TestRpcServer.java
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
package rpc;
|
||||||
|
|
||||||
|
public class TestRpcServer {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
new RpcServer().bind(8888);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
@ChannelHandler.Sharable
|
@ChannelHandler.Sharable
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
public class NettyServerHandler extends SimpleChannelInboundHandler {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private YKCBusinessService ykcService;
|
private YKCBusinessService ykcService;
|
||||||
@@ -66,7 +66,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
|
||||||
// log.info("加载客户端报文=== channelId:" + ctx.channel().id() + ", msg:" + msg);
|
log.info("加载客户端报文channelRead=== channelId:" + ctx.channel().id() + ", msg:" + message);
|
||||||
// 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
|
// 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
|
||||||
byte[] msg = (byte[]) message;
|
byte[] msg = (byte[]) message;
|
||||||
|
|
||||||
@@ -109,6 +109,11 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
|
log.info("channelRead0=== channelId:" + ctx.channel().id() + ", msg:" + msg);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 有客户端终止连接服务器会触发此函数
|
* 有客户端终止连接服务器会触发此函数
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user