完成文件下载,以及重启 Jar 包。

master
linbin 2024-09-17 19:39:23 +08:00
parent 0ab6a5a680
commit c1ba437555
13 changed files with 332 additions and 158 deletions

View File

@ -28,6 +28,9 @@ public class App
String value = (String) mapWithIndentStructure.get("active");
if (value.equals("agent"))
{
Map<String, Object> env = (Map<String, Object>) mapWithIndentStructure.get("env");
AgentConfig.hosId = (String) env.get("hosId");
AgentConfig.agentId = (String) env.get("agentId");
ApplicationContext context = ApplicationContext.boot(AgentConfig.class);
context.getBean(AgentConfig.class);
}

View File

@ -11,6 +11,9 @@ import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.decoder.TotalLengthFieldBasedFrameDecoder;
import com.jfirer.jnet.common.util.ChannelConfig;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.agent.control.CommandHandler;
import org.tianhe.agent.control.DownloadFileHandler;
import org.tianhe.agent.control.PacketParseHandler;
import org.tianhe.common.PacketWriteHandler;
import org.tianhe.common.packet.ControlChannelHeartBeat;
import org.tianhe.common.packet.RegisterSelfControlChannel;
@ -31,6 +34,8 @@ public class AgentConfig implements AwareContextInited
public final static String PID = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
public final static boolean WINDOW = System.getProperty("os.name").toLowerCase().contains("win");
public static String DIR_PATH;
public static String hosId;
public static String agentId;
static
{
@ -48,11 +53,8 @@ public class AgentConfig implements AwareContextInited
private String tianhe_ip;
@PropertyRead("tianhe.port")
private String tianhe_port;
@PropertyRead("agent.hosId")
private String hosId;
@PropertyRead("agent.agentId")
private String agentId;
@PropertyRead("download.size")
private int segmentSize;
@Override
public void aware(ApplicationContext applicationContext)
{
@ -66,11 +68,11 @@ public class AgentConfig implements AwareContextInited
{
ClientChannel clientChannel = ClientChannel.newClient(connectToServerCfg, channelContext -> {
Pipeline pipeline = channelContext.pipeline();
pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, connectToServerCfg.getAllocator()));
pipeline.addReadProcessor(new PacketParseHandler());
pipeline.addReadProcessor(new CommandHandler());
pipeline.addWriteProcessor(new PacketWriteHandler().setAllocator(connectToServerCfg.getAllocator()));
pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, connectToServerCfg.getAllocator()));
pipeline.addReadProcessor(new PacketParseHandler());
pipeline.addReadProcessor(new DownloadFileHandler(segmentSize));
pipeline.addReadProcessor(new CommandHandler());
pipeline.addWriteProcessor(new PacketWriteHandler().setAllocator(connectToServerCfg.getAllocator()));
});
if (clientChannel.connect())
{

View File

@ -1,31 +0,0 @@
package org.tianhe.agent;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.impl.UpdateJarVersion;
import java.io.File;
import java.io.FileOutputStream;
@Data
@Accessors(chain = true)
public class FileDownload
{
private String relativePath;
private String appName;
private String versionFileDate;
private String fileId;
private String md5;
private int size;
private File downloadTempFile;
private FileOutputStream fileOutputStream;
private int currentDownloadedSize;
public FileDownload(UpdateJarVersion updateJarVersion)
{
this.relativePath = updateJarVersion.getRelativePath();
this.appName = updateJarVersion.getAppName();
this.versionFileDate = updateJarVersion.getVersionFileDate();
this.fileId = updateJarVersion.getFileId();
}
}

View File

@ -1,105 +0,0 @@
package org.tianhe.agent;
import com.jfirer.baseutil.STR;
import com.jfirer.jfire.core.inject.notated.PropertyRead;
import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import org.tianhe.common.packet.*;
import javax.annotation.Resource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@Resource
public class PacketParseHandler implements ReadProcessor<IoBuffer>
{
@PropertyRead("download.size")
private int segmentSize;
@Override
public void read(IoBuffer buffer, ReadProcessorNode next)
{
byte b = buffer.get();
try
{
switch (PacketType.valueOf(b))
{
case EXECUTE_COMMAND_REQ ->
{
ExecuteCommandReq commandReq = (ExecuteCommandReq) new ExecuteCommandReq().read(buffer);
next.fireRead(commandReq);
}
case FILE_INFO_RESP ->
{
FileInfoResp resp = (FileInfoResp) new FileInfoResp().read(buffer);
Object attach = next.pipeline().channelContext().getAttach();
if (!(attach instanceof FileDownload fileDownload))
{
next.pipeline().fireWrite(new LogReport().setContent(STR.format("收到了文件信息响应报文,但是当前通道没有文件下载的请求附件,放弃该文件信息")));
}
else
{
if (fileDownload.getFileId().equals(resp.getFileId()))
{
}
else
{
next.pipeline().fireWrite(new LogReport().setContent(STR.format("收到了文件:{}的信息响应报文但是当前通道的文件下载请求的文件id:{}与当前响应报文不一致,放弃该文件信息", resp.getFileId(), fileDownload.getFileId())));
}
}
}
case FILE_SEGMENT_RESP -> {}
}
}
finally
{
buffer.free();
}
}
@Override
public void channelClose(ReadProcessorNode next, Throwable e)
{
Object attach = next.pipeline().channelContext().getAttach();
if (attach instanceof FileDownload download)
{
FileOutputStream fileOutputStream = download.getFileOutputStream();
if (fileOutputStream != null)
{
try
{
fileOutputStream.close();
}
catch (IOException ignored)
{
;
}
}
}
next.fireChannelClose(e);
}
private void startDownload(FileInfoResp resp, FileDownload download, Pipeline pipeline)
{
String relativePath = download.getRelativePath();
String name = new File(AgentConfig.DIR_PATH, relativePath).getName();
new File(AgentConfig.DIR_PATH, "temp").mkdirs();
File tempDownFile = new File(AgentConfig.DIR_PATH + File.separator + "temp", name + System.currentTimeMillis());
try
{
tempDownFile.createNewFile();
}
catch (IOException e)
{
pipeline.fireWrite(new LogReport().setContent(STR.format("创建应用:{}的临时文件:{}失败,放弃下载", download.getAppName(), tempDownFile.getAbsolutePath())));
pipeline.channelContext().setAttach(null);
return;
}
pipeline.fireWrite(new LogReport().setContent(STR.format("收到了应用:{}的文件:{}的新版本查询信息,准备开始下载", resp.getAppName(), resp.getFileId())));
int end = segmentSize > resp.getSize() ? resp.getSize() : segmentSize;
pipeline.fireWrite(new FileSegmentReq().setFileId(resp.getFileId()).setStart(0).setEnd(end));
}
}

View File

@ -1,4 +1,4 @@
package org.tianhe.agent;
package org.tianhe.agent.control;
import com.jfirer.baseutil.STR;
import com.jfirer.baseutil.StringUtil;
@ -6,10 +6,12 @@ import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.agent.AgentConfig;
import org.tianhe.common.command.Command;
import org.tianhe.common.command.impl.ReBootJar;
import org.tianhe.common.command.impl.StopJar;
import org.tianhe.common.command.impl.UpdateJarVersion;
import org.tianhe.common.command.impl.UpdateJarVersionAndReboot;
import org.tianhe.common.packet.ExecuteCommandReq;
import org.tianhe.common.packet.FileInfoReq;
import org.tianhe.common.packet.LogReport;
@ -29,7 +31,6 @@ import static org.tianhe.agent.AgentConfig.*;
@Slf4j
public class CommandHandler implements ReadProcessor<Object>
{
@Override
public void read(Object o, ReadProcessorNode next)
{
@ -55,6 +56,13 @@ public class CommandHandler implements ReadProcessor<Object>
next.pipeline().channelContext().setAttach(new FileDownload(updateJarVersion));
next.pipeline().fireWrite(new FileInfoReq().setFileId(updateJarVersion.getFileId()));
}
case Command.UPDATE_JAR_VERSION_AND_REBOOT ->
{
UpdateJarVersionAndReboot updateJarVersionAndReboot = (UpdateJarVersionAndReboot) commandDTO;
next.pipeline().fireWrite(new LogReport().setContent(STR.format("对应用进行版本更新,应用名称:{},文件版本为:{}", updateJarVersionAndReboot.getAppName(), updateJarVersionAndReboot.getVersionFileDate())));
next.pipeline().channelContext().setAttach(new FileDownload(updateJarVersionAndReboot));
next.pipeline().fireWrite(new FileInfoReq().setFileId(updateJarVersionAndReboot.getFileId()));
}
}
}
else
@ -65,11 +73,11 @@ public class CommandHandler implements ReadProcessor<Object>
private void rebootJar(ReBootJar reBootJar, Pipeline pipeline)
{
String msg = STR.format("收到重启指令,应用名称:{},文件地址为:{}", reBootJar.getJarName(), reBootJar.getRelativePath());
String msg = STR.format("收到重启指令,应用名称:{},文件地址为:{}", reBootJar.getJarNameWithoutExtension(), reBootJar.getRelativePath());
log.debug(msg);
LogReport logReport = new LogReport().setContent(msg);
pipeline.fireWrite(logReport);
List<String> pidByName = getPidByName(reBootJar.getJarName(), pipeline);
List<String> pidByName = getPidByName(reBootJar.getJarNameWithoutExtension(), pipeline);
pidByName.forEach(pid -> killPid(pid, pipeline));
startJar(reBootJar.getRelativePath(), pipeline);
}
@ -144,7 +152,7 @@ public class CommandHandler implements ReadProcessor<Object>
public void startJar(String relativePath, Pipeline pipeline)
{
File targetJarFile = new File(DIR_PATH + File.separator + relativePath);
File targetJarFile = new File(DIR_PATH, relativePath);
if (!targetJarFile.exists())
{
pipeline.fireWrite(new LogReport().setContent(STR.format("文件:{}不存在,无法启动对应的 jar", targetJarFile.getAbsolutePath())));

View File

@ -0,0 +1,174 @@
package org.tianhe.agent.control;
import com.jfirer.baseutil.STR;
import com.jfirer.baseutil.encrypt.Md5Util;
import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import lombok.Data;
import org.tianhe.agent.AgentConfig;
import org.tianhe.common.command.Command;
import org.tianhe.common.command.impl.ReBootJar;
import org.tianhe.common.packet.FileInfoResp;
import org.tianhe.common.packet.FileSegmentReq;
import org.tianhe.common.packet.FileSegmentResp;
import org.tianhe.common.packet.LogReport;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import static org.tianhe.common.command.Command.UPDATE_JAR_VERSION_AND_REBOOT;
@Data
public class DownloadFileHandler implements ReadProcessor<Object>
{
private final int segmentSize;
@Override
public void read(Object o, ReadProcessorNode next)
{
if (o instanceof FileInfoResp resp)
{
Object attach = next.pipeline().channelContext().getAttach();
if (!(attach instanceof FileDownload fileDownload))
{
next.pipeline().fireWrite(new LogReport().setContent(STR.format("收到了文件信息响应报文,但是当前通道没有文件下载的请求附件,放弃该文件信息")));
}
else if (!fileDownload.getFileId().equals(resp.getFileId()))
{
next.pipeline().fireWrite(new LogReport().setContent(STR.format("收到了文件:{}的信息响应报文但是当前通道的文件下载请求的文件id:{}与当前响应报文不一致,放弃该文件信息", resp.getFileId(), fileDownload.getFileId())));
}
else
{
startDownload(resp, fileDownload, next.pipeline());
}
}
else if (o instanceof FileSegmentResp resp)
{
Pipeline pipeline = next.pipeline();
Object attach = pipeline.channelContext().getAttach();
if (!(attach instanceof FileDownload fileDownload))
{
pipeline.fireWrite(new LogReport().setContent(STR.format("收到了文件下载响应报文,但是当前通道没有文件下载的请求附件,放弃该文件信息")));
}
else if (!fileDownload.getFileId().equals(resp.getFileId()))
{
pipeline.fireWrite(new LogReport().setContent(STR.format("收到了文件:{}的片段响应报文但是当前通道的文件下载请求的文件id:{}与当前响应报文不一致,放弃该文件信息", resp.getFileId(), fileDownload.getFileId())));
}
else
{
continueDownload(resp, fileDownload, next.pipeline(), next);
}
}
else
{
next.fireRead(o);
}
}
private void startDownload(FileInfoResp resp, FileDownload download, Pipeline pipeline)
{
String relativePath = download.getRelativePath();
String name = new File(AgentConfig.DIR_PATH, relativePath).getName();
new File(AgentConfig.DIR_PATH, "temp").mkdirs();
File tempDownFile = new File(AgentConfig.DIR_PATH + File.separator + "temp", name + System.currentTimeMillis());
try
{
tempDownFile.createNewFile();
download.setDownloadTempFile(tempDownFile);
download.setFileOutputStream(new FileOutputStream(tempDownFile));
}
catch (IOException e)
{
pipeline.fireWrite(new LogReport().setContent(STR.format("创建应用:{}的临时文件:{}失败,放弃下载", download.getAppName(), tempDownFile.getAbsolutePath())));
pipeline.channelContext().setAttach(null);
return;
}
pipeline.fireWrite(new LogReport().setContent(STR.format("收到了应用:{}的文件:{}的新版本查询信息,准备开始下载", resp.getAppName(), resp.getFileId())));
int end = Math.min(segmentSize, resp.getSize());
pipeline.fireWrite(new FileSegmentReq().setFileId(resp.getFileId()).setStart(0).setEnd(end));
}
private void continueDownload(FileSegmentResp resp, FileDownload fileDownload, Pipeline pipeline, ReadProcessorNode next)
{
try
{
fileDownload.getFileOutputStream().write(resp.getSegment());
if (fileDownload.addDownloadSize(resp.getSegment().length))
{
pipeline.fireWrite(new LogReport().setContent(STR.format("应用:{}的文件:{}下载进度:{}%平均下载速度{}K。", fileDownload.getAppName(), fileDownload.getRelativePath(), fileDownload.getCurrentPercent(), fileDownload.getCurrentDownloadedSize() / 1024 / (System.currentTimeMillis() - fileDownload.getStartTime()))));
}
if (fileDownload.getCurrentPercent() == 100)
{
fileDownload.getFileOutputStream().close();
pipeline.fireWrite(new LogReport().setContent(STR.format("应用:{}的文件:{}下载完成,耗时:{}s", fileDownload.getAppName(), fileDownload.getRelativePath(), (System.currentTimeMillis() - fileDownload.getStartTime()) / 1000)));
String fileMD5 = Md5Util.md5(fileDownload.getDownloadTempFile());
if (!fileMD5.equals(fileDownload.getMd5()))
{
pipeline.fireWrite(new LogReport().setContent(STR.format("应用:{}的文件:{}下载失败MD5不一致放弃该文件信息", fileDownload.getAppName(), fileDownload.getRelativePath())));
pipeline.channelContext().setAttach(null);
return;
}
switch (fileDownload.getCommand())
{
case Command.UPDATE_JAR_VERSION, UPDATE_JAR_VERSION_AND_REBOOT ->
{
File targetFile = new File(AgentConfig.DIR_PATH, fileDownload.getRelativePath());
Files.copy(fileDownload.getDownloadTempFile().toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
fileDownload.getDownloadTempFile().delete();
pipeline.channelContext().setAttach(null);
pipeline.fireWrite(new LogReport().setContent(STR.format("文件:{}更新成功", fileDownload.getRelativePath())));
if (fileDownload.getCommand() == UPDATE_JAR_VERSION_AND_REBOOT)
{
ReBootJar reBootJar = new ReBootJar().setJarNameWithoutExtension(fileDownload.getJarNameWithoutExtension()).setRelativePath(fileDownload.getRelativePath());
next.fireRead(reBootJar);
}
}
}
}
else
{
int end = fileDownload.getCurrentDownloadedSize() + segmentSize <= fileDownload.getSize() ? fileDownload.getCurrentDownloadedSize() + segmentSize : fileDownload.getSize() - fileDownload.getCurrentDownloadedSize();
pipeline.fireWrite(new FileSegmentReq().setFileId(resp.getFileId()).setStart(fileDownload.getCurrentDownloadedSize()).setEnd(end));
}
}
catch (IOException e)
{
pipeline.fireWrite(new LogReport().setContent(STR.format("文件:{}写入文件片段失败,放弃该文件信息", fileDownload.getRelativePath())));
try
{
fileDownload.getFileOutputStream().close();
}
catch (IOException ignored)
{
;
}
pipeline.channelContext().setAttach(null);
}
}
@Override
public void channelClose(ReadProcessorNode next, Throwable e)
{
Object attach = next.pipeline().channelContext().getAttach();
if (attach instanceof FileDownload download)
{
FileOutputStream fileOutputStream = download.getFileOutputStream();
if (fileOutputStream != null)
{
try
{
fileOutputStream.close();
}
catch (IOException ignored)
{
;
}
}
}
next.fireChannelClose(e);
}
}

View File

@ -0,0 +1,56 @@
package org.tianhe.agent.control;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.impl.UpdateJarVersion;
import org.tianhe.common.command.impl.UpdateJarVersionAndReboot;
import java.io.File;
import java.io.FileOutputStream;
@Data
@Accessors(chain = true)
public class FileDownload
{
private String relativePath;
private String appName;
private String jarNameWithoutExtension;
private String versionFileDate;
private String fileId;
private String md5;
private int size;
private File downloadTempFile;
private FileOutputStream fileOutputStream;
private int currentDownloadedSize;
private int prePercent = 0;
private int currentPercent = 0;
private final long startTime = System.currentTimeMillis();
private int command;
public FileDownload(UpdateJarVersion updateJarVersion)
{
this.relativePath = updateJarVersion.getRelativePath();
this.appName = updateJarVersion.getAppName();
this.versionFileDate = updateJarVersion.getVersionFileDate();
this.fileId = updateJarVersion.getFileId();
this.jarNameWithoutExtension = updateJarVersion.getJarNameWithoutExtension();
command = updateJarVersion.commandType();
}
public FileDownload(UpdateJarVersionAndReboot updateJarVersionAndReboot)
{
this.relativePath = updateJarVersionAndReboot.getRelativePath();
this.appName = updateJarVersionAndReboot.getAppName();
this.versionFileDate = updateJarVersionAndReboot.getVersionFileDate();
this.fileId = updateJarVersionAndReboot.getFileId();
this.jarNameWithoutExtension = updateJarVersionAndReboot.getJarNameWithoutExtension();
command = updateJarVersionAndReboot.commandType();
}
public boolean addDownloadSize(int segmentSize)
{
currentDownloadedSize += segmentSize;
currentPercent = currentDownloadedSize * 100 / size;
return currentPercent - prePercent >= 5 || currentPercent == 100;
}
}

View File

@ -0,0 +1,33 @@
package org.tianhe.agent.control;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.packet.ExecuteCommandReq;
import org.tianhe.common.packet.FileInfoResp;
import org.tianhe.common.packet.FileSegmentResp;
import org.tianhe.common.packet.PacketType;
@Data
public class PacketParseHandler implements ReadProcessor<IoBuffer>
{
@Override
public void read(IoBuffer buffer, ReadProcessorNode next)
{
byte b = buffer.get();
try
{
switch (PacketType.valueOf(b))
{
case EXECUTE_COMMAND_REQ -> next.fireRead(new ExecuteCommandReq().read(buffer));
case FILE_INFO_RESP -> next.fireRead(new FileInfoResp().read(buffer));
case FILE_SEGMENT_RESP -> next.fireRead(new FileSegmentResp().read(buffer));
}
}
finally
{
buffer.free();
}
}
}

View File

@ -8,10 +8,7 @@ import org.tianhe.common.command.Command;
@Accessors(chain = true)
public class ReBootJar implements Command
{
private String jarName;
/**
* /
*/
private String jarNameWithoutExtension;
private String relativePath;
@Override

View File

@ -10,6 +10,7 @@ public class UpdateJarVersion implements Command
{
private String relativePath;
private String appName;
private String jarNameWithoutExtension;
private String versionFileDate;
private String fileId;

View File

@ -0,0 +1,22 @@
package org.tianhe.common.command.impl;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
@Accessors(chain = true)
@Data
public class UpdateJarVersionAndReboot implements Command
{
private String relativePath;
private String appName;
private String jarNameWithoutExtension;
private String versionFileDate;
private String fileId;
@Override
public int commandType()
{
return Command.UPDATE_JAR_VERSION_AND_REBOOT;
}
}

View File

@ -1,12 +1,17 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class FileSegmentResp implements Packet
{
private int start;
private int end;
private byte[] segment;
private String fileId;
@Override
public int write(IoBuffer buffer)

View File

@ -3,6 +3,7 @@ package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.agent.AgentConfig;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@ -11,13 +12,17 @@ import java.time.format.DateTimeFormatter;
@Accessors(chain = true)
public class LogReport implements Packet
{
private final String date;
private String date;
private String content;
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private String agentId;
private String hosId;
public LogReport()
{
date = formatter.format(LocalDateTime.now());
date = formatter.format(LocalDateTime.now());
agentId = AgentConfig.agentId;
hosId = AgentConfig.hosId;
}
@Override
@ -26,7 +31,9 @@ public class LogReport implements Packet
buffer.put(PacketType.LOG_REPORT.getCommand());
int len1 = writeShortString(buffer, date);
int len2 = writeShortString(buffer, content);
return 1 + 2 + len1 + 2 + len2;
int len3 = writeShortString(buffer, hosId);
int len4 = writeShortString(buffer, agentId);
return 1 + 2 + len1 + 2 + len2 + 2 + len3 + 2 + len4;
}
@Override
@ -34,6 +41,8 @@ public class LogReport implements Packet
{
date = readShortString(buffer);
content = readShortString(buffer);
hosId = readShortString(buffer);
agentId = readShortString(buffer);
return this;
}
}