继续开发,当前开发中 Agent 下载文件

master
linbin 2024-09-17 04:42:45 +08:00
parent d641525037
commit 0ab6a5a680
34 changed files with 1006 additions and 208 deletions

25
pom.xml
View File

@ -9,12 +9,22 @@
<name>tianhe</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.jfirer</groupId>
<artifactId>JfireSE</artifactId>
<version>1.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>com.jfirer</groupId>
<artifactId>baseutil</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
@ -28,17 +38,12 @@
<dependency>
<groupId>com.jfirer</groupId>
<artifactId>JfireBoot</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.jfirer</groupId>
<artifactId>baseutil</artifactId>
<version>1.1.7-SNAPSHOT</version>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.jfirer</groupId>
<artifactId>jfire-jsql-starter</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
@ -159,8 +164,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>14</source>
<target>14</target>
<source>21</source>
<target>21</target>
</configuration>
</plugin>
</plugins>

View File

@ -1,13 +1,44 @@
package org.tianhe;
/**
* Hello world!
*
*/
public class App
import com.jfirer.baseutil.IoUtil;
import com.jfirer.baseutil.YamlReader;
import com.jfirer.jfire.core.ApplicationContext;
import org.tianhe.agent.AgentConfig;
import org.tianhe.web.WebConfig;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class App
{
public static void main( String[] args )
public static void main(String[] args) throws URISyntaxException, IOException
{
System.out.println( "Hello World!" );
URL location = App.class.getProtectionDomain().getCodeSource().getLocation();
URI uri = location.toURI();
File file = new File(new File(uri).getParentFile(), "tianhe.yml");
byte[] bytes = IoUtil.readAllBytes(new FileInputStream(file));
YamlReader yamlReader = new YamlReader(new String(bytes, StandardCharsets.UTF_8));
Map<String, Object> mapWithIndentStructure = yamlReader.getMapWithIndentStructure();
String value = (String) mapWithIndentStructure.get("active");
if (value.equals("agent"))
{
ApplicationContext context = ApplicationContext.boot(AgentConfig.class);
context.getBean(AgentConfig.class);
}
else if (value.equals("web"))
{
ApplicationContext context = ApplicationContext.boot(WebConfig.class);
context.getBean(WebConfig.class);
}
else
{
throw new IllegalArgumentException("无法识别的配置 active:" + value);
}
}
}

View File

@ -0,0 +1,113 @@
package org.tianhe.agent;
import com.jfirer.jfire.core.ApplicationContext;
import com.jfirer.jfire.core.AwareContextInited;
import com.jfirer.jfire.core.inject.notated.PropertyRead;
import com.jfirer.jfire.core.prepare.annotation.ComponentScan;
import com.jfirer.jfire.core.prepare.annotation.EnableAutoConfiguration;
import com.jfirer.jfire.core.prepare.annotation.configuration.Configuration;
import com.jfirer.jnet.client.ClientChannel;
import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.decoder.TotalLengthFieldBasedFrameDecoder;
import com.jfirer.jnet.common.util.ChannelConfig;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.common.PacketWriteHandler;
import org.tianhe.common.packet.ControlChannelHeartBeat;
import org.tianhe.common.packet.RegisterSelfControlChannel;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URISyntaxException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
@ComponentScan("org.tianhe.agent")
@Configuration
@EnableAutoConfiguration
@Slf4j
public class AgentConfig implements AwareContextInited
{
public final static String PID = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
public final static boolean WINDOW = System.getProperty("os.name").toLowerCase().contains("win");
public static String DIR_PATH;
static
{
try
{
DIR_PATH = new File(AgentConfig.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath()).getParentFile().getAbsolutePath();
}
catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
@PropertyRead("tianhe.ip")
private String tianhe_ip;
@PropertyRead("tianhe.port")
private String tianhe_port;
@PropertyRead("agent.hosId")
private String hosId;
@PropertyRead("agent.agentId")
private String agentId;
@Override
public void aware(ApplicationContext applicationContext)
{
new Thread(() -> {
ChannelConfig connectToServerCfg = new ChannelConfig();
connectToServerCfg.setIp(tianhe_ip);
connectToServerCfg.setPort(Integer.parseInt(tianhe_port));
connectToServerCfg.setChannelGroup(ChannelConfig.DEFAULT_CHANNEL_GROUP);
connectToServerCfg.setWorkerGroup(ChannelConfig.DEFAULT_WORKER_GROUP);
while (true)
{
ClientChannel clientChannel = ClientChannel.newClient(connectToServerCfg, channelContext -> {
Pipeline pipeline = channelContext.pipeline();
pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, connectToServerCfg.getAllocator()));
pipeline.addReadProcessor(new PacketParseHandler());
pipeline.addReadProcessor(new CommandHandler());
pipeline.addWriteProcessor(new PacketWriteHandler().setAllocator(connectToServerCfg.getAllocator()));
});
if (clientChannel.connect())
{
RegisterSelfControlChannel registerSelfControlChannel = new RegisterSelfControlChannel();
registerSelfControlChannel.setHosId(hosId).setAgentId(Byte.parseByte(agentId));
try
{
clientChannel.write(registerSelfControlChannel);
}
catch (ClosedChannelException e)
{
log.error("连接天河服务端注册自身失败,休眠 5 秒后重试");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
continue;
}
while (true)
{
ControlChannelHeartBeat beat = new ControlChannelHeartBeat();
try
{
clientChannel.write(beat);
}
catch (ClosedChannelException e)
{
log.error("向天河服务端发送心跳失败,休眠 5 秒后重试");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
continue;
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
}
}
else
{
log.error("连接天河服务端注册自身失败,休眠 5 秒后重试");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
}
}
}).start();
}
}

View File

@ -0,0 +1,195 @@
package org.tianhe.agent;
import com.jfirer.baseutil.STR;
import com.jfirer.baseutil.StringUtil;
import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.common.command.Command;
import org.tianhe.common.command.impl.ReBootJar;
import org.tianhe.common.command.impl.StopJar;
import org.tianhe.common.command.impl.UpdateJarVersion;
import org.tianhe.common.packet.ExecuteCommandReq;
import org.tianhe.common.packet.FileInfoReq;
import org.tianhe.common.packet.LogReport;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import static org.tianhe.agent.AgentConfig.*;
@Slf4j
public class CommandHandler implements ReadProcessor<Object>
{
@Override
public void read(Object o, ReadProcessorNode next)
{
if (o instanceof ExecuteCommandReq commandReq)
{
Command commandDTO = commandReq.getCommandDTO();
switch (commandDTO.commandType())
{
case Command.REBOOT_JAR ->
{
ReBootJar reBootJar = (ReBootJar) commandDTO;
rebootJar(reBootJar, next.pipeline());
}
case Command.STOP_JAR ->
{
StopJar stopJar = (StopJar) commandDTO;
stopJar(stopJar, next.pipeline());
}
case Command.UPDATE_JAR_VERSION ->
{
UpdateJarVersion updateJarVersion = (UpdateJarVersion) commandDTO;
next.pipeline().fireWrite(new LogReport().setContent(STR.format("对应用进行版本更新,应用名称:{},文件版本为:{}", updateJarVersion.getAppName(), updateJarVersion.getVersionFileDate())));
next.pipeline().channelContext().setAttach(new FileDownload(updateJarVersion));
next.pipeline().fireWrite(new FileInfoReq().setFileId(updateJarVersion.getFileId()));
}
}
}
else
{
next.fireRead(o);
}
}
private void rebootJar(ReBootJar reBootJar, Pipeline pipeline)
{
String msg = STR.format("收到重启指令,应用名称:{},文件地址为:{}", reBootJar.getJarName(), reBootJar.getRelativePath());
log.debug(msg);
LogReport logReport = new LogReport().setContent(msg);
pipeline.fireWrite(logReport);
List<String> pidByName = getPidByName(reBootJar.getJarName(), pipeline);
pidByName.forEach(pid -> killPid(pid, pipeline));
startJar(reBootJar.getRelativePath(), pipeline);
}
private void stopJar(StopJar stopJar, Pipeline pipeline)
{
String msg = STR.format("收到停止指令,应用名称:{},文件地址为:{}", stopJar.getJarName(), stopJar.getJarFilePath());
log.debug(msg);
LogReport logReport = new LogReport().setContent(msg);
pipeline.fireWrite(logReport);
List<String> pidByName = getPidByName(stopJar.getJarName(), pipeline);
pidByName.forEach(pid -> killPid(pid, pipeline));
}
private List<String> getPidByName(String appName, Pipeline pipeline)
{
List<String> list = new ArrayList<>();
ProcessBuilder processBuilder = AgentConfig.WINDOW ?//
new ProcessBuilder("cmd.exe", "/c", "wmic process where \"name='java.exe' and CommandLine like '%%" + appName + "%%'\" get ProcessId /value | findstr \"=\"")//
: new ProcessBuilder("sh", "-c", "ps aux | grep " + appName + " | grep -v grep | awk '{print $2}'");
try
{
Process process = processBuilder.start();
try (BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)))
{
String line;
while ((line = input.readLine()) != null)
{
log.debug("命令输出:{}", line);
if (StringUtil.isNotBlank(line))
{
String pid = WINDOW ? line.substring(10) : line;
if (!PID.equalsIgnoreCase(pid))
{
list.add(pid);
}
}
}
}
process.destroy();
String msg = list.isEmpty() ? STR.format("通过应用名称:{}检查,未能发现进程", appName) : STR.format("通过应用名称:{}查询,一共发现 PID:{}", appName, String.join(",", list));
log.debug(msg);
pipeline.fireWrite(new LogReport().setContent(msg));
return list;
}
catch (Throwable e)
{
log.error("通过应用名称:{}获取应用出现异常", appName, e);
pipeline.fireWrite(new LogReport().setContent(STR.format("通过应用名称:{}获取应用出现异常。异常信息:{}", appName, e)));
return list;
}
}
public void killPid(String pid, Pipeline pipeline)
{
ProcessBuilder builder = WINDOW ? new ProcessBuilder("cmd.exe", "/C", "taskkill /F /PID " + pid) : new ProcessBuilder("kill", "-9", pid);
try
{
Process process = builder.start();
process.waitFor();
process.destroy();
String msg = STR.format("终止进程:{}", pid);
log.debug(msg);
pipeline.fireWrite(new LogReport().setContent(msg));
}
catch (Throwable e)
{
log.error("终止进程:{}出现异常", pid, e);
pipeline.fireWrite(new LogReport().setContent(STR.format("终止进程:{}出现异常。异常信息:{}", pid, e)));
}
}
public void startJar(String relativePath, Pipeline pipeline)
{
File targetJarFile = new File(DIR_PATH + File.separator + relativePath);
if (!targetJarFile.exists())
{
pipeline.fireWrite(new LogReport().setContent(STR.format("文件:{}不存在,无法启动对应的 jar", targetJarFile.getAbsolutePath())));
return;
}
String fileName = targetJarFile.getName();
int index = fileName.lastIndexOf(".");
String fileNameWithoutExtension = fileName.substring(0, index);
String extension = fileName.substring(index);
File copyFile = new File(targetJarFile.getParentFile(), fileNameWithoutExtension + "_copy" + extension);
try
{
Files.copy(targetJarFile.toPath(), copyFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
catch (Throwable e)
{
pipeline.fireWrite(new LogReport().setContent(STR.format("复制文件:{}失败,异常信息:{}", copyFile.getAbsolutePath(), e)));
return;
}
pipeline.fireWrite(new LogReport().setContent(STR.format("复制文件:{}成功,现在准备启动", copyFile.getAbsolutePath())));
ProcessBuilder builder = WINDOW ? new ProcessBuilder("java", "-jar", copyFile.getAbsolutePath()) : new ProcessBuilder("nohup", "java", "-jar", copyFile.getAbsolutePath(), "&");
new Thread(() -> {
Process process = null;
try
{
process = builder.start();
try (BufferedReader reader = process.inputReader(StandardCharsets.UTF_8))
{
while (reader.readLine() != null)
{
;
}
}
}
catch (IOException e)
{
throw new RuntimeException(e);
}
finally
{
if (process != null)
{
process.destroy();
}
}
}).start();
}
}

View File

@ -0,0 +1,31 @@
package org.tianhe.agent;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.impl.UpdateJarVersion;
import java.io.File;
import java.io.FileOutputStream;
@Data
@Accessors(chain = true)
public class FileDownload
{
private String relativePath;
private String appName;
private String versionFileDate;
private String fileId;
private String md5;
private int size;
private File downloadTempFile;
private FileOutputStream fileOutputStream;
private int currentDownloadedSize;
public FileDownload(UpdateJarVersion updateJarVersion)
{
this.relativePath = updateJarVersion.getRelativePath();
this.appName = updateJarVersion.getAppName();
this.versionFileDate = updateJarVersion.getVersionFileDate();
this.fileId = updateJarVersion.getFileId();
}
}

View File

@ -0,0 +1,105 @@
package org.tianhe.agent;
import com.jfirer.baseutil.STR;
import com.jfirer.jfire.core.inject.notated.PropertyRead;
import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import org.tianhe.common.packet.*;
import javax.annotation.Resource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@Resource
public class PacketParseHandler implements ReadProcessor<IoBuffer>
{
@PropertyRead("download.size")
private int segmentSize;
@Override
public void read(IoBuffer buffer, ReadProcessorNode next)
{
byte b = buffer.get();
try
{
switch (PacketType.valueOf(b))
{
case EXECUTE_COMMAND_REQ ->
{
ExecuteCommandReq commandReq = (ExecuteCommandReq) new ExecuteCommandReq().read(buffer);
next.fireRead(commandReq);
}
case FILE_INFO_RESP ->
{
FileInfoResp resp = (FileInfoResp) new FileInfoResp().read(buffer);
Object attach = next.pipeline().channelContext().getAttach();
if (!(attach instanceof FileDownload fileDownload))
{
next.pipeline().fireWrite(new LogReport().setContent(STR.format("收到了文件信息响应报文,但是当前通道没有文件下载的请求附件,放弃该文件信息")));
}
else
{
if (fileDownload.getFileId().equals(resp.getFileId()))
{
}
else
{
next.pipeline().fireWrite(new LogReport().setContent(STR.format("收到了文件:{}的信息响应报文但是当前通道的文件下载请求的文件id:{}与当前响应报文不一致,放弃该文件信息", resp.getFileId(), fileDownload.getFileId())));
}
}
}
case FILE_SEGMENT_RESP -> {}
}
}
finally
{
buffer.free();
}
}
@Override
public void channelClose(ReadProcessorNode next, Throwable e)
{
Object attach = next.pipeline().channelContext().getAttach();
if (attach instanceof FileDownload download)
{
FileOutputStream fileOutputStream = download.getFileOutputStream();
if (fileOutputStream != null)
{
try
{
fileOutputStream.close();
}
catch (IOException ignored)
{
;
}
}
}
next.fireChannelClose(e);
}
private void startDownload(FileInfoResp resp, FileDownload download, Pipeline pipeline)
{
String relativePath = download.getRelativePath();
String name = new File(AgentConfig.DIR_PATH, relativePath).getName();
new File(AgentConfig.DIR_PATH, "temp").mkdirs();
File tempDownFile = new File(AgentConfig.DIR_PATH + File.separator + "temp", name + System.currentTimeMillis());
try
{
tempDownFile.createNewFile();
}
catch (IOException e)
{
pipeline.fireWrite(new LogReport().setContent(STR.format("创建应用:{}的临时文件:{}失败,放弃下载", download.getAppName(), tempDownFile.getAbsolutePath())));
pipeline.channelContext().setAttach(null);
return;
}
pipeline.fireWrite(new LogReport().setContent(STR.format("收到了应用:{}的文件:{}的新版本查询信息,准备开始下载", resp.getAppName(), resp.getFileId())));
int end = segmentSize > resp.getSize() ? resp.getSize() : segmentSize;
pipeline.fireWrite(new FileSegmentReq().setFileId(resp.getFileId()).setStart(0).setEnd(end));
}
}

View File

@ -1,39 +0,0 @@
package org.tianhe.common;
public interface Command
{
/**
*
*
* @return
*/
CommandName name();
/**
* id
*
* @return
*/
String envId();
/**
* id id
*
* @return
*/
String agentId();
/**
*
*
* @return
*/
String commandId();
/**
* jfireSE 使 Base64便 http
*
* @return
*/
String content();
}

View File

@ -1,6 +0,0 @@
package org.tianhe.common;
public enum CommandName
{
}

View File

@ -0,0 +1,33 @@
package org.tianhe.common;
import com.jfirer.jnet.common.api.WriteProcessor;
import com.jfirer.jnet.common.api.WriteProcessorNode;
import com.jfirer.jnet.common.buffer.allocator.BufferAllocator;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.packet.Packet;
@Data
@Accessors(chain = true)
public class PacketWriteHandler implements WriteProcessor<Object>
{
private BufferAllocator allocator;
@Override
public void write(Object data, WriteProcessorNode next)
{
if (data instanceof Packet packet)
{
IoBuffer ioBuffer = allocator.ioBuffer(128);
ioBuffer.addWritePosi(4);
int len = packet.write(ioBuffer);
ioBuffer.putInt(len, 0);
next.fireWrite(ioBuffer);
}
else
{
next.fireWrite(data);
}
}
}

View File

@ -0,0 +1,12 @@
package org.tianhe.common.command;
public interface Command
{
int REBOOT_JAR = 1;
int STOP_JAR = 2;
int UPDATE_JAR_VERSION = 3;
int UPDATE_JAR_VERSION_AND_REBOOT = 4;
int commandType();
}

View File

@ -0,0 +1,22 @@
package org.tianhe.common.command.impl;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
@Data
@Accessors(chain = true)
public class ReBootJar implements Command
{
private String jarName;
/**
* /
*/
private String relativePath;
@Override
public int commandType()
{
return REBOOT_JAR;
}
}

View File

@ -0,0 +1,19 @@
package org.tianhe.common.command.impl;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
@Data
@Accessors(chain = true)
public class StopJar implements Command
{
private String jarName;
private String jarFilePath;
@Override
public int commandType()
{
return Command.STOP_JAR;
}
}

View File

@ -0,0 +1,21 @@
package org.tianhe.common.command.impl;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
@Data
@Accessors(chain = true)
public class UpdateJarVersion implements Command
{
private String relativePath;
private String appName;
private String versionFileDate;
private String fileId;
@Override
public int commandType()
{
return UPDATE_JAR_VERSION;
}
}

View File

@ -0,0 +1,19 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
public class ControlChannelHeartBeat implements Packet
{
@Override
public int write(IoBuffer buffer)
{
buffer.put(PacketType.CONTROL_CHANNEL_HEART_BEAT.getCommand());
return 1;
}
@Override
public Packet read(IoBuffer buffer)
{
return this;
}
}

View File

@ -0,0 +1,36 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import com.jfirer.se2.JfireSE;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
import org.tianhe.util.JfireSEPool;
@Data
@Accessors(chain = true)
public class ExecuteCommandReq implements Packet
{
private Command commandDTO;
@Override
public int write(IoBuffer buffer)
{
buffer.put(PacketType.EXECUTE_COMMAND_REQ.getCommand());
JfireSE jfireSE = JfireSEPool.getJfireSE();
byte[] serialize = jfireSE.serialize(commandDTO);
buffer.putInt(serialize.length);
buffer.put(serialize);
return 1 + 4 + serialize.length;
}
@Override
public Packet read(IoBuffer buffer)
{
int len = buffer.getInt();
byte[] bytes = new byte[len];
buffer.get(bytes);
commandDTO = (Command) JfireSEPool.getJfireSE().deSerialize(bytes);
return this;
}
}

View File

@ -0,0 +1,18 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
public class ExecuteCommandResp implements Packet
{
@Override
public int write(IoBuffer buffer)
{
return 0;
}
@Override
public Packet read(IoBuffer buffer)
{
return null;
}
}

View File

@ -1,8 +1,11 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import org.tianhe.common.CommandType;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class FileInfoReq implements Packet
{
private String fileId;
@ -10,8 +13,8 @@ public class FileInfoReq implements Packet
@Override
public int write(IoBuffer buffer)
{
buffer.put(CommandType.FILE_INFO_REQ.getCommand());
return 3 + writeShortString(buffer, fileId);
buffer.put(PacketType.FILE_INFO_REQ.getCommand());
return 1 + 2 + writeShortString(buffer, fileId);
}
@Override

View File

@ -1,25 +1,33 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import org.tianhe.common.CommandType;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class FileInfoResp implements Packet
{
private String fileId;
private String relativePath;
private String appName;
private String md5;
private int size;
@Override
public int write(IoBuffer buffer)
{
buffer.put(CommandType.FILE_INFO_RESP.getCommand());
buffer.put(PacketType.FILE_INFO_RESP.getCommand());
int len = writeShortString(buffer, md5);
buffer.putInt(size);
return 1 + len + 4;
return 1 + 2 + len + 4;
}
@Override
public Packet read(IoBuffer buffer)
{
return null;
md5 = readShortString(buffer);
size = buffer.getInt();
return this;
}
}

View File

@ -0,0 +1,39 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class FileSegmentReq implements Packet
{
private String fileId;
/**
*
*/
private int start;
/**
*
*/
private int end;
@Override
public int write(IoBuffer buffer)
{
buffer.put(PacketType.FILE_SEGMENT_REQ.getCommand());
int len = writeShortString(buffer, fileId);
buffer.putInt(start);
buffer.putInt(end);
return 1 + 2 + len + 4 + 4;
}
@Override
public Packet read(IoBuffer buffer)
{
fileId = readShortString(buffer);
start = buffer.getInt();
end = buffer.getInt();
return this;
}
}

View File

@ -0,0 +1,32 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
public class FileSegmentResp implements Packet
{
private int start;
private int end;
private byte[] segment;
@Override
public int write(IoBuffer buffer)
{
buffer.put(PacketType.FILE_SEGMENT_RESP.getCommand());
buffer.putInt(start);
buffer.putInt(end);
buffer.putInt(segment.length);
buffer.put(segment);
return 1 + 4 + 4 + 4 + segment.length;
}
@Override
public Packet read(IoBuffer buffer)
{
start = buffer.getInt();
end = buffer.getInt();
int len = buffer.getInt();
segment = new byte[len];
buffer.get(segment);
return this;
}
}

View File

@ -2,21 +2,31 @@ package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import lombok.experimental.Accessors;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Data
@Accessors(chain = true)
public class LogReport implements Packet
{
private String date;
private String content;
private final String date;
private String content;
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public LogReport()
{
date = formatter.format(LocalDateTime.now());
}
@Override
public int write(IoBuffer buffer)
{
buffer.put(CommandType.LOG_REPORT.getCommand());
buffer.put(PacketType.LOG_REPORT.getCommand());
int len1 = writeShortString(buffer, date);
int len2 = writeShortString(buffer, content);
return 1 + 2 + len1 + 4 + len2;
return 1 + 2 + len1 + 2 + len2;
}
@Override

View File

@ -0,0 +1,31 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
@Data
public class OpenDataChannelReq implements Packet
{
private String channelId;
private String localAddress;
private int localPort;
@Override
public int write(IoBuffer buffer)
{
buffer.put(PacketType.OPEN_DATA_CHANNEL_REQ.getCommand());
int len1 = writeShortString(buffer, channelId);
int len2 = writeShortString(buffer, localAddress);
buffer.putInt(localPort);
return 1 + 2 + len1 + 2 + len2 + 4;
}
@Override
public Packet read(IoBuffer buffer)
{
channelId = readShortString(buffer);
localAddress = readShortString(buffer);
localPort = buffer.getInt();
return this;
}
}

View File

@ -1,17 +1,22 @@
package org.tianhe.common;
package org.tianhe.common.packet;
public enum CommandType
public enum PacketType
{
REGISTER_SELF_CONTROL_CHANNEL(1),//
REGISTER_SELF_DATA_CHANNEL(2),//
CONTROL_CHANNEL_HEART_BEAT(3),//
LOG_REPORT(4),//
FILE_INFO_REQ(5),//
FILE_INFO_RESP(6),
FILE_INFO_RESP(6),//
FILE_SEGMENT_REQ(7),//
FILE_SEGMENT_RESP(8),//
OPEN_DATA_CHANNEL_REQ(9),//
EXECUTE_COMMAND_REQ(10),//
EXECUTE_COMMAND_RESP(11),//
;
private byte command;
CommandType(int i)
PacketType(int i)
{
command = (byte) i;
}
@ -21,7 +26,7 @@ public enum CommandType
return command;
}
public static CommandType valueOf(byte command)
public static PacketType valueOf(byte command)
{
return switch (command)
{

View File

@ -2,9 +2,10 @@ package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class RegisterSelfControlChannel implements Packet
{
private String hosId;
@ -12,7 +13,7 @@ public class RegisterSelfControlChannel implements Packet
public int write(IoBuffer buffer)
{
buffer.put(CommandType.REGISTER_SELF_CONTROL_CHANNEL.getCommand());
buffer.put(PacketType.REGISTER_SELF_CONTROL_CHANNEL.getCommand());
int len = writeShortString(buffer, hosId);
buffer.put(agentId);
return 1 + 2 + len + 1;

View File

@ -0,0 +1,36 @@
package org.tianhe.util;
import com.jfirer.se2.JfireSE;
public class JfireSEPool
{
private static final JfireSE[] JFIRE_SE = new JfireSE[findNextPowerOfTwo(Runtime.getRuntime().availableProcessors())];
private static int shift = JFIRE_SE.length - 1;
public static int findNextPowerOfTwo(int n)
{
if (n <= 0)
{
throw new IllegalArgumentException("Input must be a positive integer.");
}
// 特殊情况处理:如果 n 已经是 2 的次方幂
if ((n & (n - 1)) == 0)
{
return n;
}
// 计算大于 n 的最近的 2 的次方
int position = 0;
while (n != 0)
{
n >>= 1;
position++;
}
// 返回结果
return 1 << position;
}
public static JfireSE getJfireSE()
{
return JFIRE_SE[(int) (Thread.currentThread().threadId() & shift)];
}
}

View File

@ -0,0 +1,69 @@
package org.tianhe.web;
import com.jfirer.jfire.core.ApplicationContext;
import com.jfirer.jfire.core.AwareContextInited;
import com.jfirer.jfire.core.inject.notated.PropertyRead;
import com.jfirer.jfire.core.prepare.annotation.ComponentScan;
import com.jfirer.jfire.core.prepare.annotation.EnableAutoConfiguration;
import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.decoder.TotalLengthFieldBasedFrameDecoder;
import com.jfirer.jnet.common.util.ChannelConfig;
import com.jfirer.jnet.server.AioServer;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.web.service.LogReportService;
import org.tianhe.web.control.ControlChannelStruct;
import org.tianhe.web.control.AgentReqHandler;
import org.tianhe.common.PacketWriteHandler;
import org.tianhe.web.control.RegisterSelfControlChannelHandler;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.concurrent.ConcurrentHashMap;
@ComponentScan("org.tianhe.web")
@EnableAutoConfiguration
@Slf4j
@Resource
public class WebConfig implements AwareContextInited
{
@PropertyRead("datasource.user")
private String username;
@PropertyRead("datasource.password")
private String password;
@PropertyRead("datasource.url")
private String url;
@Resource
private LogReportService logReportService;
@Resource
public DataSource dataSource()
{
HikariDataSource dataSource = new HikariDataSource();
dataSource.setUsername(username);
dataSource.setPassword(password);
dataSource.setJdbcUrl(url);
return dataSource;
}
@Override
public void aware(ApplicationContext applicationContext)
{
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setMsOfReadTimeout(1000 * 30);
channelConfig.setWorkerGroup(ChannelConfig.DEFAULT_WORKER_GROUP);
channelConfig.setChannelGroup(ChannelConfig.DEFAULT_CHANNEL_GROUP);
channelConfig.setPort(39999);
ConcurrentHashMap<String, ControlChannelStruct> controlChannels = new ConcurrentHashMap<>();
AioServer aioServer = AioServer.newAioServer(channelConfig, channelContext -> {
Pipeline pipeline = channelContext.pipeline();
pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, channelConfig.getAllocator()));
AgentReqHandler agentReqHandler = new AgentReqHandler();
RegisterSelfControlChannelHandler registerSelfControlChannelHandler = new RegisterSelfControlChannelHandler().setAgentReqHandler(agentReqHandler).setControl_channels(controlChannels);
pipeline.addReadProcessor(registerSelfControlChannelHandler);
pipeline.addReadProcessor(agentReqHandler);
pipeline.addWriteProcessor(new PacketWriteHandler().setAllocator(channelConfig.getAllocator()));
});
aioServer.start();
log.info("启动天河运维web端");
}
}

View File

@ -0,0 +1,59 @@
package org.tianhe.web.control;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.allocator.BufferAllocator;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.packet.PacketType;
import org.tianhe.common.packet.*;
import org.tianhe.web.service.FileService;
import org.tianhe.web.service.LogReportService;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Data
public class AgentReqHandler implements ReadProcessor<IoBuffer>
{
private BufferAllocator allocator;
private ControlChannelStruct struct;
@Resource
private LogReportService logReportService;
@Resource
private FileService fileService;
@Override
public void read(IoBuffer buffer, ReadProcessorNode next)
{
try
{
PacketType packetType = PacketType.valueOf(buffer.get());
switch (packetType)
{
case CONTROL_CHANNEL_HEART_BEAT -> struct.setLastActiveTime(LocalDateTime.now());
case LOG_REPORT ->
{
LogReport read = (LogReport) new LogReport().read(buffer);
logReportService.report(read.getContent(), read.getDate());
}
case FILE_INFO_REQ ->
{
FileInfoReq infoReq = new FileInfoReq().read(buffer);
FileInfoResp fileInfoResp = fileService.find(infoReq);
next.pipeline().fireWrite(fileInfoResp);
}
case FILE_SEGMENT_REQ ->
{
FileSegmentReq req = (FileSegmentReq) new FileSegmentReq().read(buffer);
FileSegmentResp resp = fileService.find(req);
next.pipeline().fireWrite(resp);
}
}
}
finally
{
buffer.free();
}
}
}

View File

@ -1,4 +1,4 @@
package org.tianhe.web.tcp;
package org.tianhe.web.control;
import com.jfirer.jnet.common.api.Pipeline;
import lombok.Data;

View File

@ -1,4 +1,4 @@
package org.tianhe.web.tcp;
package org.tianhe.web.control;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
@ -6,7 +6,7 @@ import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.common.CommandType;
import org.tianhe.common.packet.PacketType;
import org.tianhe.common.packet.RegisterSelfControlChannel;
import java.time.LocalDateTime;
@ -18,7 +18,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class RegisterSelfControlChannelHandler implements ReadProcessor<IoBuffer>
{
private boolean unDefined = true;
private HeartBeatHandler heartBeatHandler;
private AgentReqHandler agentReqHandler;
private ConcurrentHashMap<String, ControlChannelStruct> control_channels;
private ControlChannelStruct controlChannelStruct;
@ -27,13 +27,13 @@ public class RegisterSelfControlChannelHandler implements ReadProcessor<IoBuffer
{
if (unDefined)
{
if (data.get() == CommandType.REGISTER_SELF_CONTROL_CHANNEL.getCommand())
if (data.get() == PacketType.REGISTER_SELF_CONTROL_CHANNEL.getCommand())
{
unDefined = false;
RegisterSelfControlChannel from = new RegisterSelfControlChannel().read(data);
controlChannelStruct = new ControlChannelStruct(from.getHosId() + "_" + from.getAgentId(), next.pipeline(), LocalDateTime.now());
control_channels.put(controlChannelStruct.getKey(), controlChannelStruct);
heartBeatHandler.setControlChannelStruct(controlChannelStruct);
agentReqHandler.setStruct(controlChannelStruct);
data.free();
}
else

View File

@ -1,4 +1,4 @@
package org.tianhe.web.tcp;
package org.tianhe.web.control;
import com.jfirer.jfire.core.ApplicationContext;
import com.jfirer.jfire.core.AwareContextInited;
@ -7,6 +7,7 @@ import com.jfirer.jnet.common.decoder.TotalLengthFieldBasedFrameDecoder;
import com.jfirer.jnet.common.util.ChannelConfig;
import com.jfirer.jnet.server.AioServer;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.common.PacketWriteHandler;
import org.tianhe.web.service.LogReportService;
import javax.annotation.Resource;
@ -31,13 +32,13 @@ public class WerbTcpServer implements AwareContextInited
AioServer aioServer = AioServer.newAioServer(channelConfig, channelContext -> {
Pipeline pipeline = channelContext.pipeline();
pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, channelConfig.getAllocator()));
HeartBeatHandler heartBeatHandler = new HeartBeatHandler();
RegisterSelfControlChannelHandler registerSelfControlChannelHandler = new RegisterSelfControlChannelHandler().setHeartBeatHandler(heartBeatHandler).setControl_channels(controlChannels);
AgentReqHandler agentReqHandler = new AgentReqHandler();
RegisterSelfControlChannelHandler registerSelfControlChannelHandler = new RegisterSelfControlChannelHandler().setAgentReqHandler(agentReqHandler).setControl_channels(controlChannels);
pipeline.addReadProcessor(registerSelfControlChannelHandler);
pipeline.addReadProcessor(heartBeatHandler);
pipeline.addReadProcessor(new LogReportHandler().setLogReportService(logReportService));
pipeline.addReadProcessor(agentReqHandler);
pipeline.addWriteProcessor(new PacketWriteHandler().setAllocator(channelConfig.getAllocator()));
});
aioServer.start();
log.debug("运维服务器穿透模块启动");
log.info("启动天河运维 Web 端");
}
}

View File

@ -2,8 +2,12 @@ package org.tianhe.web.service;
import org.tianhe.common.packet.FileInfoReq;
import org.tianhe.common.packet.FileInfoResp;
import org.tianhe.common.packet.FileSegmentReq;
import org.tianhe.common.packet.FileSegmentResp;
public interface FileService
{
FileInfoResp find(FileInfoReq req);
FileSegmentResp find(FileSegmentReq req);
}

View File

@ -1,51 +0,0 @@
package org.tianhe.web.tcp;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import org.tianhe.common.packet.FileInfoReq;
import org.tianhe.common.packet.FileInfoResp;
import org.tianhe.common.packet.LogReport;
import org.tianhe.web.service.FileService;
import org.tianhe.web.service.LogReportService;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Data
public class CommandHandler implements ReadProcessor<IoBuffer>
{
private ControlChannelStruct struct;
@Resource
private LogReportService logReportService;
@Resource
private FileService fileService;
@Override
public void read(IoBuffer buffer, ReadProcessorNode next)
{
CommandType commandType = CommandType.valueOf(buffer.get());
switch (commandType)
{
case CONTROL_CHANNEL_HEART_BEAT ->
{
struct.setLastActiveTime(LocalDateTime.now());
buffer.free();
}
case LOG_REPORT ->
{
LogReport read = (LogReport) new LogReport().read(buffer);
logReportService.report(read.getContent(), read.getDate());
buffer.free();
}
case FILE_INFO_REQ ->
{
FileInfoReq infoReq = new FileInfoReq().read(buffer);
FileInfoResp fileInfoResp = fileService.find(infoReq);
}
}
}
}

View File

@ -1,31 +0,0 @@
package org.tianhe.web.tcp;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import java.time.LocalDateTime;
@Data
public class HeartBeatHandler implements ReadProcessor<IoBuffer>
{
private ControlChannelStruct controlChannelStruct;
@Override
public void read(IoBuffer data, ReadProcessorNode next)
{
byte b = data.get();
if (b == CommandType.CONTROL_CHANNEL_HEART_BEAT.getCommand())
{
controlChannelStruct.setLastActiveTime(LocalDateTime.now());
data.free();
}
else
{
data.addReadPosi(-1);
next.fireRead(data);
}
}
}

View File

@ -1,33 +0,0 @@
package org.tianhe.web.tcp;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.CommandType;
import org.tianhe.common.packet.LogReport;
import org.tianhe.web.service.LogReportService;
@Data
@Accessors(chain = true)
public class LogReportHandler implements ReadProcessor<IoBuffer>
{
private LogReportService logReportService;
@Override
public void read(IoBuffer data, ReadProcessorNode next)
{
if (data.get() == CommandType.LOG_REPORT.getCommand())
{
LogReport read = (LogReport) new LogReport().read(data);
logReportService.report(read.getContent(), read.getDate());
data.free();
}
else
{
data.addReadPosi(-1);
next.fireRead(data);
}
}
}