mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-04-21 19:45:09 +08:00
同步获取响应数据
This commit is contained in:
@@ -0,0 +1,19 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
|
||||
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||
ChannelPipeline pipeline = socketChannel.pipeline();
|
||||
|
||||
pipeline.addLast(new MessageEncode());
|
||||
pipeline.addLast(new MessageDecode());
|
||||
|
||||
pipeline.addLast(new RpcResponseHandler());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.Message;
|
||||
import com.jsowell.common.protocol.MessageConstant;
|
||||
import com.jsowell.common.util.bean.SerializationUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class MessageDecode extends ByteToMessageDecoder {
|
||||
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
|
||||
|
||||
// 由于数据包的前4个字节用于记录总数据大小,如果数据不够4个字节,不进行读
|
||||
if(byteBuf.readableBytes() < 4) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 标记开始读的位置
|
||||
byteBuf.markReaderIndex();
|
||||
|
||||
// 前四个字节记录了数据大小
|
||||
int dataSize = byteBuf.readInt();
|
||||
|
||||
// 查看剩余可读字节是否足够,如果不是,重置读取位置,等待下一次解析
|
||||
if(byteBuf.readableBytes() < dataSize) {
|
||||
byteBuf.resetReaderIndex();
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取消息类型
|
||||
byte messageType = byteBuf.readByte();
|
||||
// 读取数据, 数组大小需要剔除1个字节的消息类型
|
||||
byte[] data = new byte[dataSize -1];
|
||||
|
||||
byteBuf.readBytes(data);
|
||||
|
||||
Message message = SerializationUtil.deserialize(MessageConstant.getMessageClass(messageType), data);
|
||||
|
||||
list.add(message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.Message;
|
||||
import com.jsowell.common.util.bean.SerializationUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToByteEncoder;
|
||||
|
||||
public class MessageEncode extends MessageToByteEncoder<Message> {
|
||||
|
||||
@Override
|
||||
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
|
||||
// 将对象进行序列化
|
||||
byte[] data = SerializationUtil.serialize(message);
|
||||
|
||||
// 写数据长度,前4个字节用于记录数据总长度(对象 + 类型(1个字节))
|
||||
byteBuf.writeInt(data.length + 1);
|
||||
// 写记录消息类型,用于反序列选择类的类型
|
||||
byteBuf.writeByte(message.getMessageType());
|
||||
// 写对象
|
||||
byteBuf.writeBytes(data);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
|
||||
public class RpcClient {
|
||||
|
||||
public Channel connect(String host, Integer port) {
|
||||
EventLoopGroup worker = new NioEventLoopGroup();
|
||||
Channel channel = null;
|
||||
|
||||
try {
|
||||
Bootstrap bootstrap = new Bootstrap();
|
||||
|
||||
bootstrap.group(worker)
|
||||
.channel(NioSocketChannel.class)
|
||||
.option(ChannelOption.AUTO_READ, true)
|
||||
.handler(new ClientChannelInitializer());
|
||||
|
||||
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
|
||||
|
||||
System.out.println("客户端启动");
|
||||
|
||||
channel = channelFuture.channel();
|
||||
|
||||
// 添加关闭监听器
|
||||
channel.closeFuture().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
System.out.println("关闭客户端");
|
||||
worker.shutdownGracefully();
|
||||
}
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
||||
if(channel == null || !channel.isActive()) {
|
||||
worker.shutdownGracefully();
|
||||
} else {
|
||||
channel.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return channel;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.RpcRequest;
|
||||
import com.jsowell.common.protocol.RpcResponse;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.DefaultEventLoopGroup;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> {
|
||||
|
||||
private final static EventLoopGroup worker = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1);
|
||||
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
|
||||
|
||||
// 为避免占用网络io,此处异步进行处理
|
||||
worker.submit(() -> {
|
||||
System.out.println("[RpcRequestHandler] "+ Thread.currentThread().getName() +" 处理请求,msg: " + msg);
|
||||
|
||||
// 模拟处理耗时
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
RpcResponse rpcResponse = new RpcResponse();
|
||||
rpcResponse.setId(msg.getId());
|
||||
rpcResponse.setResult("处理" + msg.getParam());
|
||||
|
||||
ctx.writeAndFlush(rpcResponse);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.RpcResponse;
|
||||
import com.jsowell.common.protocol.SyncPromise;
|
||||
import com.jsowell.common.util.RpcUtil;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcResponse> {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
|
||||
// 根据请求id,在集合中找到与外部线程通信的SyncPromise对象
|
||||
SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msg.getId());
|
||||
|
||||
if(syncPromise != null) {
|
||||
// 设置响应结果
|
||||
// syncPromise.setRpcResponse(msg);
|
||||
|
||||
// 唤醒外部线程
|
||||
syncPromise.wake();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
|
||||
public class RpcServer {
|
||||
|
||||
public void bind(Integer port) {
|
||||
|
||||
EventLoopGroup parent = new NioEventLoopGroup();
|
||||
EventLoopGroup child = new NioEventLoopGroup();
|
||||
Channel channel = null;
|
||||
|
||||
try{
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
|
||||
serverBootstrap.group(parent, child)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.option(ChannelOption.SO_BACKLOG, 1024)
|
||||
.childHandler(new ServerChannelInitializer());
|
||||
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
|
||||
|
||||
System.out.println("server启动");
|
||||
|
||||
// 非阻塞等待关闭
|
||||
channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
System.out.println("server关闭");
|
||||
parent.shutdownGracefully();
|
||||
child.shutdownGracefully();
|
||||
}
|
||||
});
|
||||
|
||||
channel = channelFuture.channel();
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
e.printStackTrace();
|
||||
|
||||
if(channel == null || !channel.isActive()) {
|
||||
System.out.println("server关闭");
|
||||
parent.shutdownGracefully();
|
||||
child.shutdownGracefully();
|
||||
} else {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import com.jsowell.common.core.domain.ykc.YKCFrameTypeCode;
|
||||
import com.jsowell.common.enums.ykc.PileChannelEntity;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.CRC16Util;
|
||||
import com.jsowell.common.util.YKCUtils;
|
||||
import com.jsowell.pile.rpc.SyncPromise;
|
||||
import com.jsowell.pile.service.PileMsgRecordService;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@Slf4j
|
||||
public class RpcUtil {
|
||||
|
||||
private final static Map<String, SyncPromise> syncPromiseMap = new ConcurrentHashMap<>();
|
||||
|
||||
public static Map<String, SyncPromise> getSyncPromiseMap() {
|
||||
return syncPromiseMap;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
|
||||
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||
ChannelPipeline pipeline = socketChannel.pipeline();
|
||||
|
||||
pipeline.addLast(new MessageEncode());
|
||||
pipeline.addLast(new MessageDecode());
|
||||
|
||||
pipeline.addLast(new RpcRequestHandler());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SyncPromise {
|
||||
|
||||
// 用于接收结果
|
||||
// private RpcResponse rpcResponse;
|
||||
|
||||
private byte[] rpcResult;
|
||||
|
||||
private final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
// 用于判断是否超时
|
||||
private boolean isTimeout = false;
|
||||
|
||||
public boolean isTimeout() {
|
||||
return isTimeout;
|
||||
}
|
||||
|
||||
public byte[] getRpcResult() {
|
||||
return rpcResult;
|
||||
}
|
||||
|
||||
public void setRpcResult(byte[] rpcResult) {
|
||||
this.rpcResult = rpcResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步等待返回结果
|
||||
*/
|
||||
// public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
// // 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断
|
||||
// boolean earlyWakeUp = countDownLatch.await(timeout, unit);
|
||||
//
|
||||
// if(earlyWakeUp) {
|
||||
// // 超时时间内countDownLatch减到0,提前唤醒,说明已有结果
|
||||
// return rpcResponse;
|
||||
// } else {
|
||||
// // 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果
|
||||
// isTimeout = true;
|
||||
// return null;
|
||||
// }
|
||||
// }
|
||||
|
||||
public byte[] get2(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
// 等待阻塞,超时时间内countDownLatch减到0,将提前唤醒,以此作为是否超时判断
|
||||
boolean earlyWakeUp = countDownLatch.await(timeout, unit);
|
||||
|
||||
if(earlyWakeUp) {
|
||||
// 超时时间内countDownLatch减到0,提前唤醒,说明已有结果
|
||||
return rpcResult;
|
||||
} else {
|
||||
// 超时时间内countDownLatch没有减到0,自动唤醒,说明超时时间内没有等到结果
|
||||
isTimeout = true;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void wake() {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
|
||||
// public RpcResponse getRpcResponse() {
|
||||
// return rpcResponse;
|
||||
// }
|
||||
// public void setRpcResponse(RpcResponse rpcResponse) {
|
||||
// this.rpcResponse = rpcResponse;
|
||||
// }
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.RpcRequest;
|
||||
|
||||
public class TestRpcClient {
|
||||
public static void main(String[] args) throws Exception{
|
||||
//Channel channel = new RpcClient().connect("127.0.0.1", 8888);
|
||||
|
||||
Thread thread1 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
RpcRequest rpcRequest = new RpcRequest();
|
||||
rpcRequest.setParam("参数1");
|
||||
|
||||
try {
|
||||
// System.out.println("thread1发送请求");
|
||||
// RpcResponse rpcResponse = RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS);
|
||||
// System.out.println("thread1处理结果:" + rpcResponse);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Thread thread2 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
RpcRequest rpcRequest2 = new RpcRequest();
|
||||
rpcRequest2.setParam("参数2");
|
||||
|
||||
try {
|
||||
// System.out.println("thread2发送请求");
|
||||
// RpcResponse rpcResponse = RpcUtil.send(rpcRequest2, 2, TimeUnit.SECONDS);
|
||||
// System.out.println("thread2处理结果:" + rpcResponse);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// 休眠一下,等待客户端与服务端进行连接
|
||||
Thread.sleep(1000);
|
||||
|
||||
thread1.start();
|
||||
thread2.start();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
public class TestRpcServer {
|
||||
public static void main(String[] args) {
|
||||
new RpcServer().bind(8888);
|
||||
}
|
||||
}
|
||||
@@ -2,9 +2,12 @@ package com.jsowell.pile.service;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.jsowell.common.constant.CacheConstants;
|
||||
import com.jsowell.common.core.domain.ykc.YKCDataProtocol;
|
||||
import com.jsowell.common.core.redis.RedisCache;
|
||||
import com.jsowell.common.enums.ykc.ChargingFailedReasonEnum;
|
||||
import com.jsowell.common.enums.ykc.ReturnCodeEnum;
|
||||
import com.jsowell.common.exception.BusinessException;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.StringUtils;
|
||||
import com.jsowell.pile.domain.PileBillingTemplate;
|
||||
import com.jsowell.pile.domain.PileFirmwareInfo;
|
||||
@@ -296,6 +299,49 @@ public class PileRemoteService {
|
||||
// 解析结果
|
||||
if (Objects.isNull(bytes)) {
|
||||
result = "0";
|
||||
} else {
|
||||
YKCDataProtocol ykcDataProtocol = new YKCDataProtocol(bytes);
|
||||
byte[] msgBody = ykcDataProtocol.getMsgBody();
|
||||
int startIndex = 0;
|
||||
int length = 16;
|
||||
|
||||
// 交易流水号
|
||||
byte[] transactionCodeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String transactionCode = BytesUtil.bcd2Str(transactionCodeByteArr);
|
||||
|
||||
// 桩编码
|
||||
startIndex += length;
|
||||
length = 7;
|
||||
byte[] pileSnByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String pileSn = BytesUtil.bcd2Str(pileSnByteArr);
|
||||
|
||||
// 枪口号
|
||||
startIndex += length;
|
||||
length = 1;
|
||||
byte[] connectorCodeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String connectorCode = BytesUtil.bcd2Str(connectorCodeByteArr);
|
||||
|
||||
// 启动结果 0x00失败 0x01成功
|
||||
startIndex += length;
|
||||
length = 1;
|
||||
byte[] resultCodeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String resultCode = BytesUtil.bcd2Str(resultCodeByteArr);
|
||||
|
||||
if (StringUtils.equals(resultCode, "00")) {
|
||||
result = "0";
|
||||
} else {
|
||||
result = "1";
|
||||
}
|
||||
|
||||
// 失败原因
|
||||
startIndex += length;
|
||||
length = 1;
|
||||
byte[] failedReasonByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String failedReason = BytesUtil.bcd2Str(failedReasonByteArr);
|
||||
String failedReasonMsg = ChargingFailedReasonEnum.getMsgByCode(Integer.parseInt(failedReason, 16));
|
||||
|
||||
log.info("0x59预约充电响应sync, 交易流水号:{}, 桩SN:{}, 枪口号:{}, 结果:{}, 失败原因:{}",
|
||||
transactionCode, pileSn, connectorCode, resultCode, failedReasonMsg);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
@@ -10,8 +10,8 @@ import com.jsowell.common.exception.BusinessException;
|
||||
import com.jsowell.common.util.*;
|
||||
import com.jsowell.common.util.Cp56Time2a.Cp56Time2aUtil;
|
||||
import com.jsowell.pile.domain.ykcCommond.*;
|
||||
import com.jsowell.pile.rpc.RpcUtil;
|
||||
import com.jsowell.pile.rpc.SyncPromise;
|
||||
import com.jsowell.common.util.RpcUtil;
|
||||
import com.jsowell.common.protocol.SyncPromise;
|
||||
import com.jsowell.pile.service.*;
|
||||
import com.jsowell.pile.vo.web.BillingTemplateVO;
|
||||
import com.jsowell.pile.vo.web.PileModelInfoVO;
|
||||
|
||||
Reference in New Issue
Block a user