Agent 的命令开发的差不多了。先开发下 Web 端的 http

main
linbin 2024-09-18 17:06:18 +08:00
parent 3a0f1c3877
commit f4efd0ceeb
20 changed files with 913 additions and 32 deletions

View File

@ -14,6 +14,11 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>com.jfirer</groupId>
<artifactId>baseutil</artifactId>
<version>1.1.10</version>
</dependency>
<dependency> <dependency>
<groupId>com.jfirer</groupId> <groupId>com.jfirer</groupId>
<artifactId>JfireSE</artifactId> <artifactId>JfireSE</artifactId>
@ -38,7 +43,7 @@
<dependency> <dependency>
<groupId>com.jfirer</groupId> <groupId>com.jfirer</groupId>
<artifactId>JfireBoot</artifactId> <artifactId>JfireBoot</artifactId>
<version>1.0.1</version> <version>1.1-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.jfirer</groupId> <groupId>com.jfirer</groupId>
@ -155,7 +160,6 @@
<name>buildTime</name> <name>buildTime</name>
<pattern>yyyy-MM-dd HH:mm:ss</pattern> <pattern>yyyy-MM-dd HH:mm:ss</pattern>
<locale>Asia/Shanghai</locale> <locale>Asia/Shanghai</locale>
<timeZone>Asia/Shanghai</timeZone>
</configuration> </configuration>
</execution> </execution>
</executions> </executions>

View File

@ -1,20 +1,26 @@
package org.tianhe.agent.control; package org.tianhe.agent.control;
import com.jfirer.baseutil.IoUtil;
import com.jfirer.baseutil.STR; import com.jfirer.baseutil.STR;
import com.jfirer.jnet.common.api.Pipeline; import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.api.ReadProcessor; import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode; import com.jfirer.jnet.common.api.ReadProcessorNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.tianhe.agent.PlatformHelper; import org.tianhe.agent.AgentConfig;
import org.tianhe.common.command.Command; import org.tianhe.common.command.Command;
import org.tianhe.common.command.impl.ReBootJar; import org.tianhe.common.command.impl.*;
import org.tianhe.common.command.impl.StopJar;
import org.tianhe.common.command.impl.UpdateJarVersion;
import org.tianhe.common.command.impl.UpdateJarVersionAndReboot;
import org.tianhe.common.packet.ExecuteCommandReq; import org.tianhe.common.packet.ExecuteCommandReq;
import org.tianhe.common.packet.FileInfoReq; import org.tianhe.common.packet.FileInfoReq;
import org.tianhe.common.packet.LogReport; import org.tianhe.common.packet.LogReport;
import org.tianhe.common.packet.ResultReq;
import org.tianhe.util.DbHelper;
import org.tianhe.util.PlatformHelper;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
@Slf4j @Slf4j
@ -54,6 +60,52 @@ public class CommandHandler implements ReadProcessor<Object>
} }
case Command.UPDATE_ZIP_VERSION -> case Command.UPDATE_ZIP_VERSION ->
{ {
UpdateZipVersion updateZipVersion = (UpdateZipVersion) commandDTO;
next.pipeline().fireWrite(new LogReport().setContent(STR.format("对应用进行版本更新,应用名称:{},文件版本为:{}", updateZipVersion.getAppName(), updateZipVersion.getVersionFileDate())));
next.pipeline().channelContext().setAttach(new FileDownload(updateZipVersion));
next.pipeline().fireWrite(new FileInfoReq().setFileId(updateZipVersion.getFileId()));
}
case Command.FETCH_DDL ->
{
FetchDDL fetchDDL = (FetchDDL) commandDTO;
next.pipeline().fireWrite(new LogReport().setContent(STR.format("收到DDL文件拉取指令数据库地址为:{}", fetchDDL.getUrl())));
String ddl = DbHelper.fetchDDL(fetchDDL.getUrl(), fetchDDL.getUsername(), fetchDDL.getPassword(), next.pipeline());
next.pipeline().fireWrite(new LogReport().setContent("数据库 DDL 获取完毕,准备回送"));
next.pipeline().fireWrite(new ResultReq().setResultId(fetchDDL.getResultId()).setCommand(Command.FETCH_DDL).setResultDTO(ddl));
}
case Command.UPDATE_DATABASE ->
{
UpdateDatabase updateDatabase = (UpdateDatabase) commandDTO;
DbHelper.updateDatabase(updateDatabase.getUrl(), updateDatabase.getUsername(), updateDatabase.getPassword(), updateDatabase.getDdl(), next.pipeline());
}
case Command.READ_TEXT_CONTENT ->
{
ReadTextContent readTextContent = (ReadTextContent) commandDTO;
File file = new File(AgentConfig.DIR_PATH, readTextContent.getRelativePath());
try (FileInputStream fileInputStream = new FileInputStream(file))
{
byte[] bytes = IoUtil.readAllBytes(fileInputStream);
next.pipeline().fireWrite(new LogReport().setContent(STR.format("读取文件:{}内容完毕", PlatformHelper.toAbsolutePath(readTextContent.getRelativePath()))));
next.pipeline().fireWrite(new ResultReq().setResultId(readTextContent.getResultId()).setCommand(Command.READ_TEXT_CONTENT).setResultDTO(new String(bytes, StandardCharsets.UTF_8)));
}
catch (IOException e)
{
next.pipeline().fireWrite(new LogReport().setContent(STR.format("读取文件:{}内容失败", PlatformHelper.toAbsolutePath(readTextContent.getRelativePath()), e)));
}
}
case Command.UPDATE_TEXT_CONTENT ->
{
UpdateTextContent updateTextContent = (UpdateTextContent) commandDTO;
File file = new File(PlatformHelper.toAbsolutePath(updateTextContent.getRelativePath()));
try (FileOutputStream outputStream = new FileOutputStream(file))
{
outputStream.write(updateTextContent.getContent().getBytes(StandardCharsets.UTF_8));
next.pipeline().fireWrite(new LogReport().setContent(STR.format("文件:{}更新完毕。", PlatformHelper.toAbsolutePath(updateTextContent.getRelativePath()))));
}
catch (IOException e)
{
next.pipeline().fireWrite(new LogReport().setContent(STR.format("文件:{}更新失败。", PlatformHelper.toAbsolutePath(updateTextContent.getRelativePath()), e)));
}
} }
} }
} }

View File

@ -7,21 +7,22 @@ import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode; import com.jfirer.jnet.common.api.ReadProcessorNode;
import lombok.Data; import lombok.Data;
import org.tianhe.agent.AgentConfig; import org.tianhe.agent.AgentConfig;
import org.tianhe.agent.PlatformHelper; import org.tianhe.util.PlatformHelper;
import org.tianhe.common.command.Command; import org.tianhe.common.command.Command;
import org.tianhe.common.packet.FileInfoResp; import org.tianhe.common.packet.FileInfoResp;
import org.tianhe.common.packet.FileSegmentReq; import org.tianhe.common.packet.FileSegmentReq;
import org.tianhe.common.packet.FileSegmentResp; import org.tianhe.common.packet.FileSegmentResp;
import org.tianhe.common.packet.LogReport; import org.tianhe.common.packet.LogReport;
import java.io.File; import java.io.*;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.List; import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import static org.tianhe.common.command.Command.UPDATE_JAR_VERSION_AND_REBOOT; import static org.tianhe.common.command.Command.UPDATE_JAR_VERSION_AND_REBOOT;
import static org.tianhe.common.command.Command.UPDATE_ZIP_VERSION;
@Data @Data
public class DownloadFileHandler implements ReadProcessor<Object> public class DownloadFileHandler implements ReadProcessor<Object>
@ -129,6 +130,38 @@ public class DownloadFileHandler implements ReadProcessor<Object>
PlatformHelper.startJar(fileDownload.getRelativePath(), pipeline); PlatformHelper.startJar(fileDownload.getRelativePath(), pipeline);
} }
} }
case UPDATE_ZIP_VERSION ->
{
File targetDir = new File(AgentConfig.DIR_PATH, fileDownload.getRelativePath());
targetDir.mkdirs();
try (FileInputStream inputStream = new FileInputStream(fileDownload.getDownloadTempFile()); ZipInputStream zipIn = new ZipInputStream(inputStream))
{
ZipEntry entry = zipIn.getNextEntry();
while (entry != null)
{
File zipTargetFile = new File(targetDir, entry.getName());
if (entry.isDirectory())
{
if (!zipTargetFile.exists())
{
zipTargetFile.mkdirs();
}
}
else
{
File parentFile = zipTargetFile.getParentFile();
if (!parentFile.exists())
{
parentFile.mkdirs();
}
extractFile(zipIn, zipTargetFile);
}
zipIn.closeEntry();
entry = zipIn.getNextEntry();
}
pipeline.fireWrite(new LogReport().setContent(STR.format("文件:{}解压完成", fileDownload.getRelativePath())));
}
}
} }
} }
else else
@ -173,4 +206,16 @@ public class DownloadFileHandler implements ReadProcessor<Object>
} }
next.fireChannelClose(e); next.fireChannelClose(e);
} }
private void extractFile(ZipInputStream zipIn, File file) throws IOException
{
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file));
byte[] bytesIn = new byte[4096];
int read;
while ((read = zipIn.read(bytesIn)) != -1)
{
bos.write(bytesIn, 0, read);
}
bos.close();
}
} }

View File

@ -4,6 +4,7 @@ import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.tianhe.common.command.impl.UpdateJarVersion; import org.tianhe.common.command.impl.UpdateJarVersion;
import org.tianhe.common.command.impl.UpdateJarVersionAndReboot; import org.tianhe.common.command.impl.UpdateJarVersionAndReboot;
import org.tianhe.common.command.impl.UpdateZipVersion;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -47,6 +48,16 @@ public class FileDownload
command = updateJarVersionAndReboot.commandType(); command = updateJarVersionAndReboot.commandType();
} }
public FileDownload(UpdateZipVersion updateZipVersion)
{
this.relativePath = updateZipVersion.getRelativePath();
this.appName = updateZipVersion.getAppName();
this.versionFileDate = updateZipVersion.getVersionFileDate();
this.fileId = updateZipVersion.getFileId();
this.jarNameWithoutExtension = updateZipVersion.getJarNameWithoutExtension();
command = updateZipVersion.commandType();
}
public boolean addDownloadSize(int segmentSize) public boolean addDownloadSize(int segmentSize)
{ {
currentDownloadedSize += segmentSize; currentDownloadedSize += segmentSize;

View File

@ -2,11 +2,15 @@ package org.tianhe.common.command;
public interface Command public interface Command
{ {
int REBOOT_JAR = 1; byte REBOOT_JAR = 1;
int STOP_JAR = 2; byte STOP_JAR = 2;
int UPDATE_JAR_VERSION = 3; byte UPDATE_JAR_VERSION = 3;
int UPDATE_JAR_VERSION_AND_REBOOT = 4; byte UPDATE_JAR_VERSION_AND_REBOOT = 4;
int UPDATE_ZIP_VERSION = 5; byte UPDATE_ZIP_VERSION = 5;
byte FETCH_DDL = 6;
byte UPDATE_DATABASE = 7;
byte READ_TEXT_CONTENT = 8;
byte UPDATE_TEXT_CONTENT = 9;
int commandType(); byte commandType();
} }

View File

@ -0,0 +1,21 @@
package org.tianhe.common.command.impl;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
@Data
@Accessors(chain = true)
public class FetchDDL implements Command
{
private String url;
private String username;
private String password;
private String resultId;
@Override
public byte commandType()
{
return FETCH_DDL;
}
}

View File

@ -12,7 +12,7 @@ public class ReBootJar implements Command
private String relativePath; private String relativePath;
@Override @Override
public int commandType() public byte commandType()
{ {
return REBOOT_JAR; return REBOOT_JAR;
} }

View File

@ -0,0 +1,19 @@
package org.tianhe.common.command.impl;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
@Data
@Accessors(chain = true)
public class ReadTextContent implements Command
{
private String relativePath;
private String resultId;
@Override
public byte commandType()
{
return READ_TEXT_CONTENT;
}
}

View File

@ -12,7 +12,7 @@ public class StopJar implements Command
private String jarFilePath; private String jarFilePath;
@Override @Override
public int commandType() public byte commandType()
{ {
return Command.STOP_JAR; return Command.STOP_JAR;
} }

View File

@ -0,0 +1,21 @@
package org.tianhe.common.command.impl;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
@Data
@Accessors(chain = true)
public class UpdateDatabase implements Command
{
private String url;
private String username;
private String password;
private String ddl;
@Override
public byte commandType()
{
return UPDATE_DATABASE;
}
}

View File

@ -15,7 +15,7 @@ public class UpdateJarVersion implements Command
private String fileId; private String fileId;
@Override @Override
public int commandType() public byte commandType()
{ {
return UPDATE_JAR_VERSION; return UPDATE_JAR_VERSION;
} }

View File

@ -15,7 +15,7 @@ public class UpdateJarVersionAndReboot implements Command
private String fileId; private String fileId;
@Override @Override
public int commandType() public byte commandType()
{ {
return Command.UPDATE_JAR_VERSION_AND_REBOOT; return Command.UPDATE_JAR_VERSION_AND_REBOOT;
} }

View File

@ -0,0 +1,19 @@
package org.tianhe.common.command.impl;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.command.Command;
@Data
@Accessors(chain = true)
public class UpdateTextContent implements Command
{
private String relativePath;
private String content;
@Override
public byte commandType()
{
return UPDATE_TEXT_CONTENT;
}
}

View File

@ -14,7 +14,7 @@ public class UpdateZipVersion implements Command
private String versionFileDate; private String versionFileDate;
private String fileId; private String fileId;
@Override @Override
public int commandType() public byte commandType()
{ {
return UPDATE_ZIP_VERSION; return UPDATE_ZIP_VERSION;
} }

View File

@ -13,6 +13,8 @@ public enum PacketType
OPEN_DATA_CHANNEL_REQ(9),// OPEN_DATA_CHANNEL_REQ(9),//
EXECUTE_COMMAND_REQ(10),// EXECUTE_COMMAND_REQ(10),//
EXECUTE_COMMAND_RESP(11),// EXECUTE_COMMAND_RESP(11),//
RESULT_REQ(12),//
; ;
private byte command; private byte command;

View File

@ -0,0 +1,39 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.util.JfireSEPool;
@Data
@Accessors(chain = true)
public class ResultReq implements Packet
{
private String resultId;
private byte command;
private Object resultDTO;
@Override
public int write(IoBuffer buffer)
{
buffer.put(PacketType.RESULT_REQ.getCommand());
buffer.put(command);
int len = writeShortString(buffer, resultId);
byte[] serialize = JfireSEPool.getJfireSE().serialize(resultDTO);
buffer.putInt(serialize.length);
buffer.put(serialize);
return 1 + 1 + 2 + len + 4 + serialize.length;
}
@Override
public Packet read(IoBuffer buffer)
{
command = buffer.get();
resultId = readShortString(buffer);
int len = buffer.getInt();
byte[] bytes = new byte[len];
buffer.get(bytes);
resultDTO = JfireSEPool.getJfireSE().deSerialize(bytes);
return this;
}
}

View File

@ -0,0 +1,562 @@
package org.tianhe.util;
import com.jfirer.baseutil.STR;
import com.jfirer.baseutil.StringUtil;
import com.jfirer.baseutil.reflect.ReflectUtil;
import com.jfirer.baseutil.time.Timewatch;
import com.jfirer.jnet.common.api.Pipeline;
import com.mysql.jdbc.Driver;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.common.packet.LogReport;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
public class DbHelper
{
static
{
try
{
Class.forName(Driver.class.getName());
}
catch (ClassNotFoundException e)
{
throw new RuntimeException(e);
}
}
private static final String COMPARE_CATALOG_NAME = "structure_compare";
public static String fetchDDL(String url, String username, String password, Pipeline pipeline)
{
try (Connection connection = DriverManager.getConnection(url, username, password))
{
Set<String> allTableNames = findAllTableNames(connection);
StringBuilder builder = new StringBuilder();
for (String each : allTableNames)
{
try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW CREATE TABLE " + each); ResultSet resultSet = preparedStatement.executeQuery())
{
if (resultSet.next())
{
String createTable = resultSet.getString("create table");
builder.append(createTable).append(";\r\n");
}
}
}
return builder.toString();
}
catch (SQLException e)
{
pipeline.fireWrite(new LogReport().setContent(STR.format("数据库连接失败,请检查数据库配置是否正确,url:{},username:{},password:{}。异常信息为:{}", url, username, password, e.toString())));
return null;
}
}
private static Set<String> findAllTableNames(Connection connection) throws SQLException
{
String catalog = connection.getCatalog();
try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM information_schema.tables WHERE table_schema = ?"))
{
Set<String> tableNames = new HashSet<>();
preparedStatement.setString(1, catalog);
try (ResultSet resultSet = preparedStatement.executeQuery())
{
while (resultSet.next())
{
tableNames.add(resultSet.getString("TABLE_NAME"));
}
}
return tableNames;
}
catch (Throwable e)
{
log.error("查询数据库下的表名称出现异常", e);
return new HashSet<>();
}
}
public static void updateDatabase(String url, String username, String password, String ddl, Pipeline pipeline)
{
List<String> sqlCommands = filterSqlCommand(Arrays.stream(ddl.split("\n")).toList());
try (Connection compareConnection = DriverManager.getConnection(url, username, password); Connection targetConnection = DriverManager.getConnection(url, username, password))
{
compareConnection.setAutoCommit(false);
targetConnection.setAutoCommit(false);
createCompareStructure(compareConnection, sqlCommands, pipeline);
createUnExistTable(compareConnection, targetConnection, pipeline);
compareConnection.setCatalog(COMPARE_CATALOG_NAME);
try (Statement compareStatement = compareConnection.createStatement(); Statement targetStatement = targetConnection.createStatement())
{
Set<String> allTableNames = fetchAllTableNames(compareStatement);
Map<String, TableInfo> structureOfCompare = fetchStructure(compareConnection, allTableNames, COMPARE_CATALOG_NAME);
Map<String, TableInfo> structureOfTarget = fetchStructure(targetConnection, allTableNames, targetConnection.getCatalog());
updateUnExistColumn(structureOfCompare, structureOfTarget, targetStatement, pipeline);
updateDifferentColumn(structureOfCompare, structureOfTarget, targetStatement, pipeline);
updateIndex(structureOfCompare, structureOfTarget, targetStatement, pipeline);
}
pipeline.fireWrite(new LogReport().setContent("数据库结构更新完毕"));
compareConnection.commit();
targetConnection.commit();
compareConnection.setAutoCommit(true);
targetConnection.setAutoCommit(true);
}
catch (SQLException e)
{
log.error("升级数据库结构发生异常", e);
pipeline.fireWrite(new LogReport().setContent(STR.format("升级数据库结构发生异常.", e)));
}
}
private static List<String> filterSqlCommand(List<String> lines)
{
List<String> sqlCommand = new ArrayList<>();
boolean inNote = false;
StringBuilder builder = new StringBuilder();
for (String each : lines)
{
if (each.trim().equals("/*"))
{
inNote = true;
}
else if (each.trim().equals("*/"))
{
inNote = false;
}
else if (inNote)
{
;
}
else if (each.startsWith("--"))
{
;
}
else if (StringUtil.isNotBlank(each))
{
builder.append(each);
if (each.trim().endsWith(";"))
{
builder.setLength(builder.length() - 1);
sqlCommand.add(builder.toString());
builder.setLength(0);
}
}
}
return sqlCommand;
}
private static void createCompareStructure(Connection connection, List<String> sqlCommands, Pipeline pipeline) throws SQLException
{
try (Statement statement = connection.createStatement())
{
statement.execute("drop database if exists " + COMPARE_CATALOG_NAME);
statement.execute("CREATE database " + COMPARE_CATALOG_NAME);
}
connection.setCatalog(COMPARE_CATALOG_NAME);
try (Statement statement = connection.createStatement())
{
sqlExecute(sqlCommands, statement, pipeline);
}
pipeline.fireWrite(new LogReport().setContent("导入 sql 语句完毕,对比数据库表创建完成.接下来准备创建缺失的表。"));
}
private static void sqlExecute(List<String> sqlCommand, Statement statement, Pipeline pipeline)
{
try
{
int count = 0;
int id = 1;
Timewatch timewatch = new Timewatch();
for (String each : sqlCommand)
{
statement.addBatch(each);
if (++count > 999)
{
count = 0;
timewatch.start();
statement.executeBatch();
statement.clearBatch();
timewatch.end();
log.debug("提交1000条执行第{}次提交,耗时:{}毫秒", id, timewatch.getTotal());
pipeline.fireWrite(new LogReport().setContent(STR.format("提交1000条执行第{}次提交,耗时:{}毫秒", id, timewatch.getTotal())));
id++;
}
}
if (count != 0)
{
timewatch.start();
statement.executeBatch();
statement.clearBatch();
timewatch.end();
log.debug("提交{}条,执行第{}次提交,耗时:{}毫秒", count, id, timewatch.getTotal());
pipeline.fireWrite(new LogReport().setContent(STR.format("提交{}条,执行第{}次提交,耗时:{}毫秒", count, id, timewatch.getTotal())));
}
log.debug("sql执行完毕");
pipeline.fireWrite(new LogReport().setContent("sql 执行完毕"));
}
catch (Throwable e)
{
log.error("sql执行过程中出现异常", e);
pipeline.fireWrite(new LogReport().setContent(STR.format("sql执行过程中出现异常", e)));
}
}
private static void createUnExistTable(Connection compareConnection, Connection targetConnection, Pipeline pipeline) throws SQLException
{
compareConnection.setCatalog(COMPARE_CATALOG_NAME);
try (Statement compareStatement = compareConnection.createStatement(); Statement targetStatement = targetConnection.createStatement())
{
Set<String> compareTableNames = fetchAllTableNames(compareStatement);
Set<String> targetTableNames = fetchAllTableNames(targetStatement);
Set<String> UnExistTableNames = compareTableNames.stream().filter(name -> targetTableNames.contains(name) == false).collect(Collectors.toSet());
if (!UnExistTableNames.isEmpty())
{
Set<String> createUnExistTableSql = UnExistTableNames.stream().map(name -> {
try (ResultSet resultSet = compareStatement.executeQuery("SHOW CREATE TABLE " + name))
{
if (resultSet.next())
{
return resultSet.getString("Create Table");
}
else
{
throw new IllegalStateException("查询 create tables 不应该为空");
}
}
catch (SQLException e)
{
ReflectUtil.throwException(e);
return null;
}
}).collect(Collectors.toSet());
pipeline.fireWrite(new LogReport().setContent(STR.format("已经查询到缺失的表的建表语句,准备执行建表语句")));
createUnExistTableSql.forEach(sql -> {
try
{
targetStatement.execute(sql);
}
catch (SQLException e)
{
log.error("创建缺失表异常当前异常sql为:{}", sql, e);
ReflectUtil.throwException(e);
}
});
pipeline.fireWrite(new LogReport().setContent(STR.format("缺失的表创建完成,共创建表:{}", UnExistTableNames)));
}
else
{
pipeline.fireWrite(new LogReport().setContent("本次没有需要新建的表"));
}
}
}
private static Set<String> fetchAllTableNames(Statement statement) throws SQLException
{
try (ResultSet resultSet = statement.executeQuery("show tables");)
{
Set<String> set = new HashSet<>();
while (resultSet.next())
{
set.add(resultSet.getString(1));
}
return set;
}
}
private static Map<String, TableInfo> fetchStructure(Connection connection, Set<String> allTableNames, String catalog) throws SQLException
{
try (PreparedStatement queryColumn = connection.prepareStatement("SELECT COLUMN_TYPE, COLUMN_COMMENT,COLUMN_NAME,COLUMN_DEFAULT,IS_NULLABLE,EXTRA FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?");
//
PreparedStatement queryIndex = connection.prepareStatement("SELECT distinct (INDEX_NAME),NON_UNIQUE,INDEX_COMMENT FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?");
//
PreparedStatement queryIndexColumn = connection.prepareStatement("select COLUMN_NAME,SEQ_IN_INDEX FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? and INDEX_NAME=? order by SEQ_IN_INDEX"))
{
Map<String, TableInfo> structureOfCatalog = new HashMap<>();
for (String tableName : allTableNames)
{
Map<String, ColumnInfo> columns = queryColumns(catalog, tableName, queryColumn);
List<Index> indexes = queryIndexs(catalog, tableName, queryIndex, queryIndexColumn);
structureOfCatalog.put(tableName, new TableInfo(tableName, columns, indexes));
}
return structureOfCatalog;
}
}
private static Map<String, ColumnInfo> queryColumns(String catalog, String tableName, PreparedStatement queryColumn) throws SQLException
{
queryColumn.clearParameters();
queryColumn.setString(1, catalog);
queryColumn.setString(2, tableName);
Map<String, ColumnInfo> columns = new HashMap<>();
try (ResultSet resultSet = queryColumn.executeQuery())
{
while (resultSet.next())
{
String type = resultSet.getString(1);
String comment = StringUtil.notBlankOrDefault(resultSet.getString(2), "");
String name = resultSet.getString(3);
String defaultValue = StringUtil.notBlankOrDefault(resultSet.getString(4), "");
boolean isNull = resultSet.getString(5).equalsIgnoreCase("YES");
String extra = StringUtil.notBlankOrDefault(resultSet.getString(6), "");
columns.put(name, new ColumnInfo(type, name, comment, tableName, defaultValue, isNull, extra));
}
}
return columns;
}
private static List<Index> queryIndexs(String catalog, String tableName, PreparedStatement queryIndex, PreparedStatement queryIndexColumn) throws SQLException
{
queryIndex.clearParameters();
queryIndex.setString(1, catalog);
queryIndex.setString(2, tableName);
List<Index> indexes = new ArrayList<>();
try (ResultSet resultSet = queryIndex.executeQuery())
{
while (resultSet.next())
{
String indexName = resultSet.getString(1);
if (indexName.equalsIgnoreCase("primary"))
{
continue;
}
boolean unique = !resultSet.getBoolean(2);
String comment = resultSet.getString(3);
queryIndexColumn.clearParameters();
queryIndexColumn.setString(1, catalog);
queryIndexColumn.setString(2, tableName);
queryIndexColumn.setString(3, indexName);
List<String> indexColumnInSeq = new ArrayList<>();
try (ResultSet indexColumns = queryIndexColumn.executeQuery())
{
while (indexColumns.next())
{
indexColumnInSeq.add(indexColumns.getString(1));
}
}
indexes.add(new Index(indexName, tableName, comment, unique, indexName.equalsIgnoreCase("primary"), indexColumnInSeq.toArray(String[]::new)));
}
}
return indexes;
}
record Index(String indexName, String table, String comment, boolean unique, boolean primary, String[] columns)
{
@Override
public int hashCode()
{
return indexName.hashCode();
}
@Override
public boolean equals(Object obj)
{
if (obj == null || obj instanceof Index == false)
{
return false;
}
Index obj1 = (Index) obj;
if (indexName.equals(obj1.indexName) && table.equals(obj1.table) && unique == obj1.unique && primary == obj1.primary && Arrays.equals(columns, obj1.columns))
{
return true;
}
else
{
return false;
}
}
}
record ColumnInfo(String type, String name, String comment, String table, String defaultValue, boolean isNull, String extra)
{
}
record TableInfo(String name, Map<String, ColumnInfo> columns, List<Index> indexes)
{
}
private static void updateUnExistColumn(Map<String, TableInfo> structureOfCompare, Map<String, TableInfo> structureOfTarget, Statement targetStatement, Pipeline pipeline)
{
Set<ColumnInfo> unExistColumns = structureOfCompare.values().stream().flatMap(tableInfo -> tableInfo.columns.values().stream())//
.filter(column -> !structureOfTarget.get(column.table).columns.containsKey(column.name)).collect(Collectors.toSet());
if (unExistColumns.isEmpty())
{
pipeline.fireWrite(new LogReport().setContent("本次没有缺失字段需要补齐"));
}
else
{
unExistColumns.forEach(columnInfo -> {
StringBuilder builder = new StringBuilder("ALTER TABLE ");
builder.append(columnInfo.table).append(" add column ").append(columnInfo.name).append(" ").append(columnInfo.type).append(" ");
if (StringUtil.isNotBlank(columnInfo.defaultValue()))
{
try
{
Double.valueOf(columnInfo.defaultValue());
builder.append("default ").append(columnInfo.defaultValue()).append(" ");
}
catch (Throwable e)
{
builder.append("default '").append(columnInfo.defaultValue()).append("' ");
}
}
if (StringUtil.isNotBlank(columnInfo.extra()))
{
builder.append(columnInfo.extra).append(" ");
}
if (columnInfo.isNull() == false)
{
builder.append("not null ");
}
if (StringUtil.isNotBlank(columnInfo.comment))
{
builder.append("comment '").append(columnInfo.comment).append("'");
}
try
{
targetStatement.execute(builder.toString());
}
catch (SQLException e)
{
log.error("字段:{}补齐字段过程中发生异常", columnInfo.table + "." + columnInfo.name, e);
}
});
pipeline.fireWrite(new LogReport().setContent(STR.format("缺失{}个字段补齐完毕,分别是:{}", unExistColumns.size(), unExistColumns.stream().map(columnInfo -> columnInfo.table + "." + columnInfo.name).collect(Collectors.joining(",")))));
}
}
private static void updateDifferentColumn(Map<String, TableInfo> structureOfCompare, Map<String, TableInfo> structureOfTarget, Statement targetStatement, Pipeline pipeline)
{
Set<ColumnInfo> differentColumns = structureOfCompare.values().stream().flatMap(tableInfo -> tableInfo.columns.values().stream())//
.filter(column -> structureOfTarget.get(column.table).columns.containsKey(column.name))//
.filter(column -> !structureOfTarget.get(column.table).columns.get(column.name).comment.equals(column.comment)//
|| !structureOfTarget.get(column.table).columns.get(column.name).defaultValue().equals(column.defaultValue())//
|| !structureOfTarget.get(column.table).columns.get(column.name).type.equals(column.type)//
|| !structureOfTarget.get(column.table).columns.get(column.name).isNull == column.isNull()//
|| !structureOfTarget.get(column.table).columns.get(column.name).extra.equals(column.extra))//
.peek(columnInfo -> {
ColumnInfo columnInfo1 = structureOfTarget.get(columnInfo.table).columns.get(columnInfo.name);
})//
.collect(Collectors.toSet());
if (differentColumns.isEmpty())
{
pipeline.fireWrite(new LogReport().setContent("本次没有不同字段需要更新"));
}
else
{
differentColumns.forEach(differentColumn -> {
StringBuilder builder = new StringBuilder();
builder.append("ALTER TABLE ").append(differentColumn.table).append(" modify column ").append(differentColumn.name).append(' ').append(differentColumn.type).append(' ');
if (StringUtil.isNotBlank(differentColumn.defaultValue()))
{
try
{
Double.valueOf(differentColumn.defaultValue());
builder.append("default ").append(differentColumn.defaultValue()).append(" ");
}
catch (Throwable e)
{
builder.append("default '").append(differentColumn.defaultValue()).append("' ");
}
}
if (StringUtil.isNotBlank(differentColumn.extra()))
{
builder.append(differentColumn.extra).append(" ");
}
if (differentColumn.isNull() == false)
{
builder.append("not null ");
}
if (StringUtil.isNotBlank(differentColumn.comment))
{
builder.append("comment '").append(differentColumn.comment).append("'");
}
String sql = builder.toString();
try
{
targetStatement.execute(sql);
}
catch (SQLException e)
{
log.error("执行表更新语句:{}过程中发现异常", sql);
}
});
pipeline.fireWrite(new LogReport().setContent(STR.format("不同字段:{}个更新完毕,更新的字段有:{}", differentColumns.size(), differentColumns.stream().map(columnInfo -> columnInfo.table + "." + columnInfo.name).collect(Collectors.joining(",")))));
}
}
private static void updateIndex(Map<String, TableInfo> structureOfCompare, Map<String, TableInfo> structureOfTarget, Statement targetStatement, Pipeline pipeline)
{
Set<Index> sameIndexes = structureOfTarget.values().stream().map(TableInfo::indexes).flatMap(Collection::stream)//
.filter(index -> //
structureOfCompare.get(index.table).indexes.stream().anyMatch(//
indexOfCompare -> indexOfCompare.indexName.equalsIgnoreCase(index.indexName) && indexOfCompare.unique == index.unique && indexOfCompare.comment.equals(index.comment) && Arrays.equals(indexOfCompare.columns, index.columns))).collect(Collectors.toSet());
//
Set<Index> allTargetIndexes = structureOfTarget.values().stream().map(TableInfo::indexes).flatMap(Collection::stream).collect(Collectors.toSet());
allTargetIndexes.removeAll(sameIndexes);
Set<Index> needDeletedIndexes = new HashSet<>(allTargetIndexes);
if (needDeletedIndexes.isEmpty())
{
pipeline.fireWrite(new LogReport().setContent("本次没有需要删除的索引"));
}
else
{
Set<Index> error = new HashSet<>();
for (Index needDeleteIndex : needDeletedIndexes)
{
String sql = STR.format("ALTER TABLE {} DROP INDEX {}", needDeleteIndex.table, needDeleteIndex.indexName);
try
{
targetStatement.execute(sql);
}
catch (SQLException e)
{
error.add(needDeleteIndex);
pipeline.fireWrite(new LogReport().setContent(STR.format("删除索引 {} 出现异常", needDeleteIndex.table + "." + needDeleteIndex.indexName, e)));
}
}
needDeletedIndexes.removeAll(error);
pipeline.fireWrite(new LogReport().setContent(STR.format("删除不同的索引完毕,删除的索引有:{}", needDeletedIndexes.stream().map(index -> index.table + "." + index.indexName).collect(Collectors.joining(",")))));
}
Set<Index> allCompareIndexes = structureOfCompare.values().stream().map(TableInfo::indexes).flatMap(Collection::stream).collect(Collectors.toSet());
allCompareIndexes.removeAll(sameIndexes);
Set<Index> needCreateIndexes = new HashSet<>(allCompareIndexes);
if (needCreateIndexes.isEmpty())
{
pipeline.fireWrite(new LogReport().setContent("没有不同的索引需要创建"));
}
else
{
Set<Index> error = new HashSet<>();
for (Index index : needCreateIndexes)
{
StringBuilder builder = new StringBuilder();
builder.append("alter table ").append(index.table).append(" add ");
if (index.unique)
{
builder.append("unique ");
}
builder.append("index ").append(index.indexName).append("(").append(String.join(",", index.columns)).append(") ");
if (StringUtil.isNotBlank(index.comment))
{
builder.append("comment '").append(index.comment).append("'");
}
String sql = builder.toString();
try
{
targetStatement.execute(sql);
}
catch (SQLException e)
{
log.error("创建索引出现异常sql 为:{}", sql);
error.add(index);
pipeline.fireWrite(new LogReport().setContent(STR.format("创建索引 {} 出现异常", index.table + "." + index.indexName, e)));
}
}
needCreateIndexes.removeAll(error);
pipeline.fireWrite(new LogReport().setContent(STR.format("创建不同的索引完毕,创建的索引有:{}", needCreateIndexes.stream().map(index -> index.table + "." + index.indexName).collect(Collectors.joining(",")))));
}
}
}

View File

@ -1,9 +1,10 @@
package org.tianhe.agent; package org.tianhe.util;
import com.jfirer.baseutil.STR; import com.jfirer.baseutil.STR;
import com.jfirer.baseutil.StringUtil; import com.jfirer.baseutil.StringUtil;
import com.jfirer.jnet.common.api.Pipeline; import com.jfirer.jnet.common.api.Pipeline;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.tianhe.agent.AgentConfig;
import org.tianhe.common.packet.LogReport; import org.tianhe.common.packet.LogReport;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -129,4 +130,9 @@ public class PlatformHelper
} }
}).start(); }).start();
} }
public static String toAbsolutePath(String relativePath)
{
return new File(DIR_PATH, relativePath).getAbsolutePath();
}
} }

View File

@ -5,17 +5,19 @@ import com.jfirer.jfire.core.AwareContextInited;
import com.jfirer.jfire.core.inject.notated.PropertyRead; import com.jfirer.jfire.core.inject.notated.PropertyRead;
import com.jfirer.jfire.core.prepare.annotation.ComponentScan; import com.jfirer.jfire.core.prepare.annotation.ComponentScan;
import com.jfirer.jfire.core.prepare.annotation.EnableAutoConfiguration; import com.jfirer.jfire.core.prepare.annotation.EnableAutoConfiguration;
import com.jfirer.jfire.core.prepare.annotation.configuration.Bean;
import com.jfirer.jfire.core.prepare.annotation.configuration.Configuration;
import com.jfirer.jnet.common.api.Pipeline; import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.decoder.TotalLengthFieldBasedFrameDecoder; import com.jfirer.jnet.common.decoder.TotalLengthFieldBasedFrameDecoder;
import com.jfirer.jnet.common.util.ChannelConfig; import com.jfirer.jnet.common.util.ChannelConfig;
import com.jfirer.jnet.server.AioServer; import com.jfirer.jnet.server.AioServer;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.tianhe.web.service.LogReportService;
import org.tianhe.web.control.ControlChannelStruct;
import org.tianhe.web.control.AgentReqHandler;
import org.tianhe.common.PacketWriteHandler; import org.tianhe.common.PacketWriteHandler;
import org.tianhe.web.control.AgentReqHandler;
import org.tianhe.web.control.ControlChannelStruct;
import org.tianhe.web.control.RegisterSelfControlChannelHandler; import org.tianhe.web.control.RegisterSelfControlChannelHandler;
import org.tianhe.web.service.LogReportService;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.sql.DataSource; import javax.sql.DataSource;
@ -25,17 +27,20 @@ import java.util.concurrent.ConcurrentHashMap;
@EnableAutoConfiguration @EnableAutoConfiguration
@Slf4j @Slf4j
@Resource @Resource
@Configuration
public class WebConfig implements AwareContextInited public class WebConfig implements AwareContextInited
{ {
@PropertyRead("datasource.user") @PropertyRead("datasource.user")
private String username; private String username;
@PropertyRead("datasource.password") @PropertyRead("datasource.password")
private String password; private String password;
@PropertyRead("datasource.url") @PropertyRead("datasource.url")
private String url; private String url;
@Resource
private LogReportService logReportService;
@Resource @Resource
private LogReportService logReportService;
private ConcurrentHashMap<String, ControlChannelStruct> controlChannels;
@Bean
public DataSource dataSource() public DataSource dataSource()
{ {
HikariDataSource dataSource = new HikariDataSource(); HikariDataSource dataSource = new HikariDataSource();
@ -53,7 +58,7 @@ public class WebConfig implements AwareContextInited
channelConfig.setWorkerGroup(ChannelConfig.DEFAULT_WORKER_GROUP); channelConfig.setWorkerGroup(ChannelConfig.DEFAULT_WORKER_GROUP);
channelConfig.setChannelGroup(ChannelConfig.DEFAULT_CHANNEL_GROUP); channelConfig.setChannelGroup(ChannelConfig.DEFAULT_CHANNEL_GROUP);
channelConfig.setPort(39999); channelConfig.setPort(39999);
ConcurrentHashMap<String, ControlChannelStruct> controlChannels = new ConcurrentHashMap<>(); controlChannels = new ConcurrentHashMap<>();
AioServer aioServer = AioServer.newAioServer(channelConfig, channelContext -> { AioServer aioServer = AioServer.newAioServer(channelConfig, channelContext -> {
Pipeline pipeline = channelContext.pipeline(); Pipeline pipeline = channelContext.pipeline();
pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, channelConfig.getAllocator())); pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, channelConfig.getAllocator()));

View File

@ -0,0 +1,71 @@
package org.tianhe.web.http;
import com.jfirer.jfire.core.ApplicationContext;
import com.jfirer.jfire.core.AwareContextInited;
import com.jfirer.jfire.core.prepare.annotation.Import;
import com.jfirer.jfirer.boot.forward.path.PathRequestForwardProcessor;
import com.jfirer.jfirer.boot.http.NotFoundUrlProcessor;
import com.jfirer.jfirer.boot.http.OptionsProcessor;
import com.jfirer.jfirer.boot.http.ResourceProcessor;
import com.jfirer.jfirer.boot.http.ResponseDataToHttpResponse;
import com.jfirer.jnet.common.api.Pipeline;
import com.jfirer.jnet.common.buffer.allocator.BufferAllocator;
import com.jfirer.jnet.common.buffer.allocator.impl.CachedBufferAllocator;
import com.jfirer.jnet.common.buffer.allocator.impl.PooledBufferAllocator;
import com.jfirer.jnet.common.internal.DefaultWorkerGroup;
import com.jfirer.jnet.common.util.CapacityStat;
import com.jfirer.jnet.common.util.ChannelConfig;
import com.jfirer.jnet.extend.http.decode.HttpRequestDecoder;
import com.jfirer.jnet.extend.http.decode.HttpResponseEncoder;
import com.jfirer.jnet.server.AioServer;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
@Resource
@Import(PathRequestForwardProcessor.class)
@Slf4j
public class HttpServer implements AwareContextInited
{
public static final BufferAllocator BUFFER_ALLOCATOR = CachedBufferAllocator.DEFAULT;
@Override
public void aware(ApplicationContext context)
{
PathRequestForwardProcessor pathRequestForwardProcessor = context.getBean(PathRequestForwardProcessor.class);
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setAllocator(BUFFER_ALLOCATOR);
new Thread(() -> {
while (true)
{
try
{
Thread.sleep(10000);
CapacityStat stat = new CapacityStat();
((PooledBufferAllocator) BUFFER_ALLOCATOR).directCapacityStat(stat);
log.trace("{}", stat);
System.gc();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
});
channelConfig.setWorkerGroup(new DefaultWorkerGroup(Runtime.getRuntime().availableProcessors() * 4, "OperationMaster-worker-"));
channelConfig.setChannelGroup(ChannelConfig.DEFAULT_CHANNEL_GROUP);
channelConfig.setPort(40001);
AioServer aioServer = AioServer.newAioServer(channelConfig, channelContext -> {
Pipeline pipeline = channelContext.pipeline();
pipeline.addReadProcessor(new HttpRequestDecoder(channelConfig.getAllocator()));
pipeline.addReadProcessor(new OptionsProcessor());
ResourceProcessor resourceProcessor = new ResourceProcessor("/dist");
pipeline.addReadProcessor(resourceProcessor);
pipeline.addReadProcessor(pathRequestForwardProcessor);
pipeline.addReadProcessor(new NotFoundUrlProcessor(resourceProcessor));
pipeline.addWriteProcessor(new ResponseDataToHttpResponse());
pipeline.addWriteProcessor(new HttpResponseEncoder(channelConfig.getAllocator()));
});
aioServer.start();
}
}