mirror of
https://codeup.aliyun.com/67c68d4e484ca2f0a13ac3c1/ydc/jsowell-charger-web.git
synced 2026-04-21 11:35:12 +08:00
Merge branch 'dev' of http://192.168.2.46:8099/jsowell/jsowell-charger-web into dev
This commit is contained in:
@@ -0,0 +1,19 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
|
||||
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||
ChannelPipeline pipeline = socketChannel.pipeline();
|
||||
|
||||
pipeline.addLast(new MessageEncode());
|
||||
pipeline.addLast(new MessageDecode());
|
||||
|
||||
pipeline.addLast(new RpcResponseHandler());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.Message;
|
||||
import com.jsowell.common.protocol.MessageConstant;
|
||||
import com.jsowell.common.util.bean.SerializationUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class MessageDecode extends ByteToMessageDecoder {
|
||||
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
|
||||
|
||||
// 由于数据包的前4个字节用于记录总数据大小,如果数据不够4个字节,不进行读
|
||||
if(byteBuf.readableBytes() < 4) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 标记开始读的位置
|
||||
byteBuf.markReaderIndex();
|
||||
|
||||
// 前四个字节记录了数据大小
|
||||
int dataSize = byteBuf.readInt();
|
||||
|
||||
// 查看剩余可读字节是否足够,如果不是,重置读取位置,等待下一次解析
|
||||
if(byteBuf.readableBytes() < dataSize) {
|
||||
byteBuf.resetReaderIndex();
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取消息类型
|
||||
byte messageType = byteBuf.readByte();
|
||||
// 读取数据, 数组大小需要剔除1个字节的消息类型
|
||||
byte[] data = new byte[dataSize -1];
|
||||
|
||||
byteBuf.readBytes(data);
|
||||
|
||||
Message message = SerializationUtil.deserialize(MessageConstant.getMessageClass(messageType), data);
|
||||
|
||||
list.add(message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.Message;
|
||||
import com.jsowell.common.util.bean.SerializationUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToByteEncoder;
|
||||
|
||||
public class MessageEncode extends MessageToByteEncoder<Message> {
|
||||
|
||||
@Override
|
||||
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
|
||||
// 将对象进行序列化
|
||||
byte[] data = SerializationUtil.serialize(message);
|
||||
|
||||
// 写数据长度,前4个字节用于记录数据总长度(对象 + 类型(1个字节))
|
||||
byteBuf.writeInt(data.length + 1);
|
||||
// 写记录消息类型,用于反序列选择类的类型
|
||||
byteBuf.writeByte(message.getMessageType());
|
||||
// 写对象
|
||||
byteBuf.writeBytes(data);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
|
||||
public class RpcClient {
|
||||
|
||||
public Channel connect(String host, Integer port) {
|
||||
EventLoopGroup worker = new NioEventLoopGroup();
|
||||
Channel channel = null;
|
||||
|
||||
try {
|
||||
Bootstrap bootstrap = new Bootstrap();
|
||||
|
||||
bootstrap.group(worker)
|
||||
.channel(NioSocketChannel.class)
|
||||
.option(ChannelOption.AUTO_READ, true)
|
||||
.handler(new ClientChannelInitializer());
|
||||
|
||||
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
|
||||
|
||||
System.out.println("客户端启动");
|
||||
|
||||
channel = channelFuture.channel();
|
||||
|
||||
// 添加关闭监听器
|
||||
channel.closeFuture().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
System.out.println("关闭客户端");
|
||||
worker.shutdownGracefully();
|
||||
}
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
||||
if(channel == null || !channel.isActive()) {
|
||||
worker.shutdownGracefully();
|
||||
} else {
|
||||
channel.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return channel;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.RpcRequest;
|
||||
import com.jsowell.common.protocol.RpcResponse;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.DefaultEventLoopGroup;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> {
|
||||
|
||||
private final static EventLoopGroup worker = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1);
|
||||
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
|
||||
|
||||
// 为避免占用网络io,此处异步进行处理
|
||||
worker.submit(() -> {
|
||||
System.out.println("[RpcRequestHandler] "+ Thread.currentThread().getName() +" 处理请求,msg: " + msg);
|
||||
|
||||
// 模拟处理耗时
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
RpcResponse rpcResponse = new RpcResponse();
|
||||
rpcResponse.setId(msg.getId());
|
||||
rpcResponse.setResult("处理" + msg.getParam());
|
||||
|
||||
ctx.writeAndFlush(rpcResponse);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.RpcResponse;
|
||||
import com.jsowell.common.protocol.SyncPromise;
|
||||
import com.jsowell.common.util.RpcUtil;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcResponse> {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
|
||||
// 根据请求id,在集合中找到与外部线程通信的SyncPromise对象
|
||||
SyncPromise syncPromise = RpcUtil.getSyncPromiseMap().get(msg.getId());
|
||||
|
||||
if(syncPromise != null) {
|
||||
// 设置响应结果
|
||||
// syncPromise.setRpcResponse(msg);
|
||||
|
||||
// 唤醒外部线程
|
||||
syncPromise.wake();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
|
||||
public class RpcServer {
|
||||
|
||||
public void bind(Integer port) {
|
||||
|
||||
EventLoopGroup parent = new NioEventLoopGroup();
|
||||
EventLoopGroup child = new NioEventLoopGroup();
|
||||
Channel channel = null;
|
||||
|
||||
try{
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
|
||||
serverBootstrap.group(parent, child)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.option(ChannelOption.SO_BACKLOG, 1024)
|
||||
.childHandler(new ServerChannelInitializer());
|
||||
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
|
||||
|
||||
System.out.println("server启动");
|
||||
|
||||
// 非阻塞等待关闭
|
||||
channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
System.out.println("server关闭");
|
||||
parent.shutdownGracefully();
|
||||
child.shutdownGracefully();
|
||||
}
|
||||
});
|
||||
|
||||
channel = channelFuture.channel();
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
e.printStackTrace();
|
||||
|
||||
if(channel == null || !channel.isActive()) {
|
||||
System.out.println("server关闭");
|
||||
parent.shutdownGracefully();
|
||||
child.shutdownGracefully();
|
||||
} else {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
|
||||
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||
ChannelPipeline pipeline = socketChannel.pipeline();
|
||||
|
||||
pipeline.addLast(new MessageEncode());
|
||||
pipeline.addLast(new MessageDecode());
|
||||
|
||||
pipeline.addLast(new RpcRequestHandler());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
import com.jsowell.common.protocol.RpcRequest;
|
||||
|
||||
public class TestRpcClient {
|
||||
public static void main(String[] args) throws Exception{
|
||||
//Channel channel = new RpcClient().connect("127.0.0.1", 8888);
|
||||
|
||||
Thread thread1 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
RpcRequest rpcRequest = new RpcRequest();
|
||||
rpcRequest.setParam("参数1");
|
||||
|
||||
try {
|
||||
// System.out.println("thread1发送请求");
|
||||
// RpcResponse rpcResponse = RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS);
|
||||
// System.out.println("thread1处理结果:" + rpcResponse);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Thread thread2 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
RpcRequest rpcRequest2 = new RpcRequest();
|
||||
rpcRequest2.setParam("参数2");
|
||||
|
||||
try {
|
||||
// System.out.println("thread2发送请求");
|
||||
// RpcResponse rpcResponse = RpcUtil.send(rpcRequest2, 2, TimeUnit.SECONDS);
|
||||
// System.out.println("thread2处理结果:" + rpcResponse);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// 休眠一下,等待客户端与服务端进行连接
|
||||
Thread.sleep(1000);
|
||||
|
||||
thread1.start();
|
||||
thread2.start();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.jsowell.pile.rpc;
|
||||
|
||||
public class TestRpcServer {
|
||||
public static void main(String[] args) {
|
||||
new RpcServer().bind(8888);
|
||||
}
|
||||
}
|
||||
@@ -2,9 +2,12 @@ package com.jsowell.pile.service;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.jsowell.common.constant.CacheConstants;
|
||||
import com.jsowell.common.core.domain.ykc.YKCDataProtocol;
|
||||
import com.jsowell.common.core.redis.RedisCache;
|
||||
import com.jsowell.common.enums.ykc.ChargingFailedReasonEnum;
|
||||
import com.jsowell.common.enums.ykc.ReturnCodeEnum;
|
||||
import com.jsowell.common.exception.BusinessException;
|
||||
import com.jsowell.common.util.BytesUtil;
|
||||
import com.jsowell.common.util.StringUtils;
|
||||
import com.jsowell.pile.domain.PileBillingTemplate;
|
||||
import com.jsowell.pile.domain.PileFirmwareInfo;
|
||||
@@ -26,6 +29,7 @@ import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@Service
|
||||
public class PileRemoteService {
|
||||
@@ -287,8 +291,60 @@ public class PileRemoteService {
|
||||
|
||||
/**
|
||||
* 预约充电指令/预约指令
|
||||
* @return result: 1-成功; 0-失败
|
||||
*/
|
||||
public void reservationCharging(ReservationChargingCommand command) {
|
||||
ykcPushCommandService.pushReservationChargingCommand(command);
|
||||
public String reservationCharging(ReservationChargingCommand command) {
|
||||
String result = "1";
|
||||
byte[] bytes = ykcPushCommandService.pushReservationChargingCommand(command);
|
||||
// 解析结果
|
||||
if (Objects.isNull(bytes)) {
|
||||
result = "0";
|
||||
} else {
|
||||
YKCDataProtocol ykcDataProtocol = new YKCDataProtocol(bytes);
|
||||
byte[] msgBody = ykcDataProtocol.getMsgBody();
|
||||
int startIndex = 0;
|
||||
int length = 16;
|
||||
|
||||
// 交易流水号
|
||||
byte[] transactionCodeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String transactionCode = BytesUtil.bcd2Str(transactionCodeByteArr);
|
||||
|
||||
// 桩编码
|
||||
startIndex += length;
|
||||
length = 7;
|
||||
byte[] pileSnByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String pileSn = BytesUtil.bcd2Str(pileSnByteArr);
|
||||
|
||||
// 枪口号
|
||||
startIndex += length;
|
||||
length = 1;
|
||||
byte[] connectorCodeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String connectorCode = BytesUtil.bcd2Str(connectorCodeByteArr);
|
||||
|
||||
// 启动结果 0x00失败 0x01成功
|
||||
startIndex += length;
|
||||
length = 1;
|
||||
byte[] resultCodeByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String resultCode = BytesUtil.bcd2Str(resultCodeByteArr);
|
||||
|
||||
if (StringUtils.equals(resultCode, "00")) {
|
||||
result = "0";
|
||||
} else {
|
||||
result = "1";
|
||||
}
|
||||
|
||||
// 失败原因
|
||||
startIndex += length;
|
||||
length = 1;
|
||||
byte[] failedReasonByteArr = BytesUtil.copyBytes(msgBody, startIndex, length);
|
||||
String failedReason = BytesUtil.bcd2Str(failedReasonByteArr);
|
||||
String failedReasonMsg = ChargingFailedReasonEnum.getMsgByCode(Integer.parseInt(failedReason, 16));
|
||||
|
||||
log.info("0x59预约充电响应sync, 交易流水号:{}, 桩SN:{}, 枪口号:{}, 结果:{}, 失败原因:{}",
|
||||
transactionCode, pileSn, connectorCode, resultCode, failedReasonMsg);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ public interface PileReservationInfoService {
|
||||
|
||||
void updateReservationStatus(PileReservationDTO dto);
|
||||
|
||||
void updateReservation(PileReservationDTO dto);
|
||||
int updateReservation(PileReservationDTO dto);
|
||||
|
||||
void personPileStopCharging(PersonPileStopChargingDTO dto);
|
||||
|
||||
|
||||
@@ -82,5 +82,6 @@ public interface YKCPushCommandService {
|
||||
* 发送预约充电命令
|
||||
* @param command
|
||||
*/
|
||||
void pushReservationChargingCommand(ReservationChargingCommand command);
|
||||
byte[] pushReservationChargingCommand(ReservationChargingCommand command);
|
||||
|
||||
}
|
||||
|
||||
@@ -3765,17 +3765,25 @@ public class OrderBasicInfoServiceImpl implements OrderBasicInfoService {
|
||||
? OrderStatusEnum.IN_THE_CHARGING.getValue()
|
||||
: OrderStatusEnum.ORDER_CLOSE_TIMEOUT.getValue();
|
||||
|
||||
PileBasicInfo pileBasicInfo = pileBasicInfoService.selectPileBasicInfoBySN(chargingStartupResult.getPileSn());
|
||||
|
||||
BigDecimal payAmount = Constants.WHITELIST_DEFAULT_AMOUNT;
|
||||
|
||||
// 订单基本信息
|
||||
OrderBasicInfo orderBasicInfo = OrderBasicInfo.builder()
|
||||
.orderCode(orderCode)
|
||||
.transactionCode(transactionCode)
|
||||
.orderStatus(status)
|
||||
.merchantId(pileBasicInfo.getMerchantId() + "")
|
||||
.stationId(pileBasicInfo.getStationId() + "")
|
||||
.pileSn(chargingStartupResult.getPileSn())
|
||||
.connectorCode(chargingStartupResult.getConnectorCode())
|
||||
.pileConnectorCode(chargingStartupResult.getPileSn() + chargingStartupResult.getConnectorCode())
|
||||
.startMode("6")
|
||||
.payStatus(Constants.TWO)
|
||||
.payStatus(Constants.ONE)
|
||||
.payMode(Constants.THREE)
|
||||
.payAmount(payAmount)
|
||||
.payTime(DateUtils.getNowDate())
|
||||
.orderAmount(BigDecimal.ZERO)
|
||||
.virtualAmount(BigDecimal.ZERO)
|
||||
.settleAmount(BigDecimal.ZERO)
|
||||
@@ -3789,6 +3797,17 @@ public class OrderBasicInfoServiceImpl implements OrderBasicInfoService {
|
||||
.orderDetail(null)
|
||||
.build();
|
||||
pileTransactionService.doCreateOrder(createOrderTransactionDTO);
|
||||
|
||||
OrderPayRecord principalPayRecord = OrderPayRecord.builder()
|
||||
.orderCode(orderCode)
|
||||
.payMode(OrderPayRecordEnum.WHITELIST_PAYMENT.getValue())
|
||||
.payAmount(payAmount)
|
||||
.acquirer(AcquirerEnum.LOCAL.getValue())
|
||||
.createBy("system")
|
||||
.createTime(DateUtils.getNowDate())
|
||||
.delFlag(DelFlagEnum.NORMAL.getValue())
|
||||
.build();
|
||||
orderPayRecordService.batchInsert(Lists.newArrayList(principalPayRecord));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -407,10 +407,10 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic
|
||||
* @param dto
|
||||
*/
|
||||
@Override
|
||||
public void updateReservation(PileReservationDTO dto) {
|
||||
public int updateReservation(PileReservationDTO dto) {
|
||||
PileReservationInfo pileReservationInfo = pileReservationInfoMapper.selectByPrimaryKey(Integer.valueOf(dto.getReservedId()));
|
||||
if (pileReservationInfo == null) {
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 操作 0x01:启动 0x02:取消 0x03:修改
|
||||
@@ -471,12 +471,11 @@ public class PileReservationInfoServiceImpl implements PileReservationInfoServic
|
||||
.reservedEndTime(pileReservationInfo.getEndTime().toLocalTime())
|
||||
.amount(Constants.WHITELIST_DEFAULT_AMOUNT)
|
||||
.build();
|
||||
pileRemoteService.reservationCharging(command);
|
||||
|
||||
// 从redis中获取回复, 3秒没有获取到判为超时
|
||||
|
||||
|
||||
this.insertOrUpdateSelective(pileReservationInfo);
|
||||
String result = pileRemoteService.reservationCharging(command);
|
||||
if (StringUtils.equals(result, Constants.ONE)) {
|
||||
return this.insertOrUpdateSelective(pileReservationInfo);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -10,12 +10,13 @@ import com.jsowell.common.exception.BusinessException;
|
||||
import com.jsowell.common.util.*;
|
||||
import com.jsowell.common.util.Cp56Time2a.Cp56Time2aUtil;
|
||||
import com.jsowell.pile.domain.ykcCommond.*;
|
||||
import com.jsowell.common.util.RpcUtil;
|
||||
import com.jsowell.common.protocol.SyncPromise;
|
||||
import com.jsowell.pile.service.*;
|
||||
import com.jsowell.pile.vo.web.BillingTemplateVO;
|
||||
import com.jsowell.pile.vo.web.PileModelInfoVO;
|
||||
import com.jsowell.pile.vo.web.PileStationVO;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
@@ -29,6 +30,8 @@ import java.time.LocalTime;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* 向充电桩发送指令service
|
||||
@@ -36,6 +39,7 @@ import java.util.Objects;
|
||||
@Slf4j
|
||||
@Service
|
||||
public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
|
||||
@Autowired
|
||||
private PileBillingTemplateService pileBillingTemplateService;
|
||||
|
||||
@@ -70,14 +74,95 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
* @param frameTypeCode
|
||||
* @return
|
||||
*/
|
||||
public boolean push(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode) {
|
||||
// public boolean push(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode) {
|
||||
// // 通过桩编号获取channel
|
||||
// ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
|
||||
// String value = ((YKCFrameTypeCode) frameTypeCode).getValue();
|
||||
// if (Objects.isNull(ctx)) {
|
||||
// log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn);
|
||||
// return false;
|
||||
// }
|
||||
// /*
|
||||
// 拼接报文
|
||||
// */
|
||||
// // 起始标志
|
||||
// byte[] head = new byte[]{0x68};
|
||||
//
|
||||
// // 序列号域
|
||||
// byte[] serialNumber = new byte[]{0x00, 0x00};
|
||||
//
|
||||
// // 加密标志
|
||||
// byte[] encryptFlag = new byte[]{0x00};
|
||||
//
|
||||
// // 帧类型标志
|
||||
// byte[] frameType = new byte[]{(byte) ((YKCFrameTypeCode) frameTypeCode).getCode()};
|
||||
//
|
||||
// // 序列号域+加密标志+帧类型标志+消息体
|
||||
// byte[] temp = Bytes.concat(serialNumber, encryptFlag, frameType, msg);
|
||||
//
|
||||
// // 数据长度
|
||||
// byte[] length = BytesUtil.intToBytes(temp.length, 1);
|
||||
//
|
||||
// // 帧校验域
|
||||
// byte[] crc = BytesUtil.intToBytes(CRC16Util.calcCrc16(temp));
|
||||
//
|
||||
// // 返回报文
|
||||
// byte[] writeMsg = Bytes.concat(head, length, temp, crc);
|
||||
//
|
||||
// // 返回完整的报文 string类型
|
||||
// String wholeMsg = BytesUtil.binary(writeMsg, 16);
|
||||
// ByteBuf byteBuf = ctx.channel().alloc().buffer().writeBytes(writeMsg);
|
||||
// ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuf);
|
||||
// channelFuture.addListener((ChannelFutureListener) channelFutureListener -> {
|
||||
// // 检查操作的状态
|
||||
// if (channelFutureListener.isSuccess()) {
|
||||
// log.info("【push结果===>成功】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
|
||||
// pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
|
||||
// } else {
|
||||
// // 如果发生错误,则访问描述原因的Throwable
|
||||
// Throwable cause = channelFutureListener.cause();
|
||||
// log.info("【push结果===>失败】, pileSn:{}, remoteAddress:{}, channelId:{}, 帧类型:{}, 报文:{}",
|
||||
// pileSn, ctx.channel().remoteAddress(), ctx.channel().id(), value, wholeMsg);
|
||||
// log.error("push发送命令失败, pileSn:{}", pileSn, cause);
|
||||
// }
|
||||
// });
|
||||
//
|
||||
// // 保存报文
|
||||
// String frameTypeStr = YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes());
|
||||
// if (frameTypeList.contains(frameTypeStr)) {
|
||||
// pileMsgRecordService.save(pileSn, null, frameTypeStr, null, wholeMsg);
|
||||
// }
|
||||
// return true;
|
||||
// }
|
||||
|
||||
public byte[] send(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode) throws Exception {
|
||||
return this.send(msg, pileSn, frameTypeCode, 5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public byte[] send(byte[] msg, String pileSn, Enum<YKCFrameTypeCode> frameTypeCode, long timeout, TimeUnit unit) throws Exception {
|
||||
// 通过桩编号获取channel
|
||||
ChannelHandlerContext ctx = PileChannelEntity.getChannelByPileSn(pileSn);
|
||||
String value = ((YKCFrameTypeCode) frameTypeCode).getValue();
|
||||
String value = ((YKCFrameTypeCode) frameTypeCode).getValue(); // 帧类型名称
|
||||
if (Objects.isNull(ctx)) {
|
||||
log.error("push命令[{}]失败, 桩号:{}无法获取到长连接, 请检查充电桩连接状态!", value, pileSn);
|
||||
return false;
|
||||
throw new NullPointerException("channel");
|
||||
}
|
||||
if(msg == null) {
|
||||
throw new NullPointerException("msg");
|
||||
}
|
||||
if(timeout <= 0) {
|
||||
throw new IllegalArgumentException("timeout must greater than 0");
|
||||
}
|
||||
|
||||
// 创造一个容器,用于存放当前线程与rpcClient中的线程交互
|
||||
SyncPromise syncPromise = new SyncPromise();
|
||||
|
||||
// 消息id = channelId + 帧类型(例如: "0x34")
|
||||
String msgId = ctx.channel().id().toString() + "_" + YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes());
|
||||
// log.info("同步获取响应数据-发送消息, msgId:{}", msgId);
|
||||
RpcUtil.getSyncPromiseMap().put(msgId, syncPromise);
|
||||
|
||||
// 发送消息,此处如果发送玩消息并且在get之前返回了结果,下一行的get将不会进入阻塞,也可以顺利拿到结果
|
||||
/*
|
||||
拼接报文
|
||||
*/
|
||||
@@ -123,12 +208,26 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
}
|
||||
});
|
||||
|
||||
// 等待获取结果
|
||||
byte[] rpcResponse = syncPromise.get2(timeout, unit);
|
||||
|
||||
if(rpcResponse == null) {
|
||||
if(syncPromise.isTimeout()) {
|
||||
throw new TimeoutException("等待响应结果超时");
|
||||
} else{
|
||||
throw new Exception("其他异常");
|
||||
}
|
||||
}
|
||||
|
||||
// 移除容器
|
||||
RpcUtil.getSyncPromiseMap().remove(msgId);
|
||||
|
||||
// 保存报文
|
||||
String frameTypeStr = YKCUtils.frameType2Str(((YKCFrameTypeCode) frameTypeCode).getBytes());
|
||||
if (frameTypeList.contains(frameTypeStr)) {
|
||||
pileMsgRecordService.save(pileSn, null, frameTypeStr, null, wholeMsg);
|
||||
}
|
||||
return true;
|
||||
return rpcResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -178,7 +277,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] accountBalanceByteArr = YKCUtils.getPriceByte(chargeAmount.toString(), 2);
|
||||
|
||||
byte[] msgBody = Bytes.concat(orderIdByteArr, pileSnByteArr, connectorCodeByteArr, logicCardNumByteArr, physicsCardNumByteArr, accountBalanceByteArr);
|
||||
this.push(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE);
|
||||
try {
|
||||
this.send(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_START_CHARGING_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("【=====平台下发充电指令=====】:订单id:{}, 桩号:{}, 枪口号:{}, 逻辑卡号:{}, 物理卡号:{}, 账户余额:{}",
|
||||
transactionCode, pileSn, BytesUtil.bcd2Str(connectorCodeByteArr), logicCardNum, physicsCardNum, chargeAmount);
|
||||
}
|
||||
@@ -194,7 +297,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
String connectorCode = command.getConnectorCode();
|
||||
// 远程停机
|
||||
byte[] msgBody = Bytes.concat(BytesUtil.str2Bcd(pileSn), BytesUtil.str2Bcd(connectorCode));
|
||||
this.push(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE);
|
||||
try {
|
||||
this.send(msgBody, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_STOP_CHARGING_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("【=====平台下发指令=====】:远程停止充电,桩号:{},枪口号:{}", pileSn, connectorCode);
|
||||
}
|
||||
|
||||
@@ -203,7 +310,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
String pileSn = command.getPileSn();
|
||||
String connectorCode = command.getConnectorCode();
|
||||
byte[] msg = BytesUtil.str2Bcd(pileSn + connectorCode);
|
||||
this.push(msg, pileSn, YKCFrameTypeCode.READ_REAL_TIME_MONITOR_DATA_CODE);
|
||||
try {
|
||||
this.send(msg, pileSn, YKCFrameTypeCode.READ_REAL_TIME_MONITOR_DATA_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("【=====平台下发指令=====】:获取充电桩:{} 的 {} 枪口实时数据信息", pileSn, connectorCode);
|
||||
}
|
||||
|
||||
@@ -212,7 +323,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
String pileSn = command.getPileSn();
|
||||
byte[] msg = BytesUtil.str2Bcd(pileSn + Constants.ZERO_ONE);
|
||||
log.info("【=====平台下发指令=====】:重启充电桩:,{}", pileSn);
|
||||
this.push(msg, pileSn, YKCFrameTypeCode.REMOTE_RESTART_CODE);
|
||||
try {
|
||||
this.send(msg, pileSn, YKCFrameTypeCode.REMOTE_RESTART_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -254,7 +369,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, qrCodeTypeByteArr, qrCodePrefixLengthByteArr, qrCodePrefixByteArr);
|
||||
|
||||
// push消息
|
||||
boolean result = this.push(msg, pileSn, YKCFrameTypeCode.REMOTE_ISSUE_QRCODE_CODE);
|
||||
try {
|
||||
this.send(msg, pileSn, YKCFrameTypeCode.REMOTE_ISSUE_QRCODE_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("【=====平台下发指令=====】 pileSn:{}, 下发二维码,地址为:{}", pileSn, qrCodePrefix);
|
||||
}
|
||||
|
||||
@@ -276,7 +395,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
// 拼装msg
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, dateBytes);
|
||||
|
||||
this.push(msg, pileSn, YKCFrameTypeCode.TIME_CHECK_SETTING_CODE);
|
||||
try {
|
||||
this.send(msg, pileSn, YKCFrameTypeCode.TIME_CHECK_SETTING_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("[充电桩:{}对时, 时间:{}, CP56Time2a:{}]", pileSn, DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date), BytesUtil.binary(dateBytes, 16));
|
||||
}
|
||||
|
||||
@@ -293,18 +416,14 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] messageBody = pileBillingTemplateService.generateBillingTemplateMsgBody(pileSn, billingTemplateVO);
|
||||
// 发送
|
||||
if (messageBody != null) {
|
||||
this.push(messageBody, pileSn, YKCFrameTypeCode.BILLING_TEMPLATE_SETTING_CODE);
|
||||
try {
|
||||
this.send(messageBody, pileSn, YKCFrameTypeCode.BILLING_TEMPLATE_SETTING_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println(Constants.updateServerPort);
|
||||
String numHex = Integer.toHexString(21);
|
||||
byte[] bytes = BytesUtil.hexString2Bytes(numHex);
|
||||
System.out.println(bytes);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushUpdateFileCommand(UpdateFirmwareCommand command) {
|
||||
List<String> pileSns = command.getPileSnList();
|
||||
@@ -360,7 +479,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
byte[] msgBody = Bytes.concat(pileSnByteArr, pileModelType, ratedPowerByteArr, updateServerAddressByteArr,
|
||||
updateServerPortByteArr, userNameByteArr, passwordByteArr, filePathByteArr, performTypeByteArr, overTimeByteArr);
|
||||
|
||||
this.push(msgBody, pileModelInfoVO.getPileSn(), YKCFrameTypeCode.REMOTE_UPDATE_CODE);
|
||||
try {
|
||||
this.send(msgBody, pileModelInfoVO.getPileSn(), YKCFrameTypeCode.REMOTE_UPDATE_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("【=====平台下发指令=====】:远程更新, 桩号:{}, 类型:{}, 额定功率:{}, 服务器地址:{}, 端口号:{}, 用户名:{}, 密码:{}, 文件路径:{}",
|
||||
pileModelInfoVO.getPileSn(), pileModelType, BytesUtil.bcd2Str(ratedPowerByteArr), BytesUtil.binary(updateServerAddressByteArr, 16),
|
||||
BytesUtil.binary(updateServerPortByteArr, 16), BytesUtil.binary(userNameByteArr, 16), BytesUtil.binary(passwordByteArr, 16),
|
||||
@@ -390,7 +513,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, workingStateByteArr, maxPowerByteArr);
|
||||
|
||||
this.push(msg, pileSn, YKCFrameTypeCode.CHARGING_PILE_WORKING_PARAMETER_SETTING_CODE);
|
||||
try {
|
||||
this.send(msg, pileSn, YKCFrameTypeCode.CHARGING_PILE_WORKING_PARAMETER_SETTING_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -417,7 +544,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
// 拼装msg信息
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, pileType);
|
||||
|
||||
this.push(msg, pileSn, YKCFrameTypeCode.QUERY_PILE_WORK_PARAMS_CODE);
|
||||
try {
|
||||
this.send(msg, pileSn, YKCFrameTypeCode.QUERY_PILE_WORK_PARAMS_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -443,7 +574,11 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
// 拼装msg信息
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, connectorCodeByteArr, logicByteArr, priceByte);
|
||||
|
||||
this.push(msg, pileSn, YKCFrameTypeCode.REMOTE_ACCOUNT_BALANCE_UPDATE_CODE);
|
||||
try {
|
||||
this.send(msg, pileSn, YKCFrameTypeCode.REMOTE_ACCOUNT_BALANCE_UPDATE_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -476,16 +611,21 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
// 拼装msg信息
|
||||
byte[] msg = Bytes.concat(pileSnByteArr, connectorCodeByteArr, operateByteArr, obligateByteArr);
|
||||
|
||||
this.push(msg, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_CODE);
|
||||
try {
|
||||
this.send(msg, pileSn, YKCFrameTypeCode.REMOTE_CONTROL_GROUND_LOCK_CODE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送预约充电命令
|
||||
*
|
||||
* @param command
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public void pushReservationChargingCommand(ReservationChargingCommand command) {
|
||||
public byte[] pushReservationChargingCommand(ReservationChargingCommand command) {
|
||||
// 交易流水号
|
||||
String transactionCode = command.getTransactionCode();
|
||||
byte[] transactionCodeArr = BytesUtil.str2Bcd(transactionCode);
|
||||
@@ -541,9 +681,17 @@ public class YKCPushCommandServiceImpl implements YKCPushCommandService {
|
||||
reservationTypeByteArr, verifyIdentityByteArr, vin1ByteArr, vin2ByteArr, vin3ByteArr,
|
||||
reservedStartTimeByteArr, reservedEndTimeByteArr, amountByteArr);
|
||||
|
||||
this.push(msg, pileSn, YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE);
|
||||
byte[] response;
|
||||
try {
|
||||
response = this.send(msg, pileSn, YKCFrameTypeCode.RESERVATION_CHARGING_SETUP_CODE);
|
||||
} catch (Exception e) {
|
||||
log.error("发送消息异常", e);
|
||||
response = null;
|
||||
}
|
||||
|
||||
log.info("【=====平台下发指令=====】: 预约充电指令, 交易流水号:{}, 桩编号:{}, 枪口号:{}, 操作:{}, 身份验证:{}, 开始时间:{}, 结束时间:{}, 启动金额:{}",
|
||||
transactionCode, pileSn, connectorCode, operation, verifyIdentity, DateUtils.formatDateTime(reservedStartTime), DateUtils.formatDateTime(reservedEndTime), amount);
|
||||
return response;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user