diff --git a/pom.xml b/pom.xml index cfa15db..1965797 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,11 @@ + + com.jfirer + baseutil + 1.1.10 + com.jfirer JfireSE @@ -38,7 +43,7 @@ com.jfirer JfireBoot - 1.0.1 + 1.1-SNAPSHOT com.jfirer @@ -155,7 +160,6 @@ buildTime yyyy-MM-dd HH:mm:ss Asia/Shanghai - Asia/Shanghai diff --git a/src/main/java/org/tianhe/agent/control/CommandHandler.java b/src/main/java/org/tianhe/agent/control/CommandHandler.java index ab703f6..7ff4149 100644 --- a/src/main/java/org/tianhe/agent/control/CommandHandler.java +++ b/src/main/java/org/tianhe/agent/control/CommandHandler.java @@ -1,20 +1,26 @@ package org.tianhe.agent.control; +import com.jfirer.baseutil.IoUtil; import com.jfirer.baseutil.STR; import com.jfirer.jnet.common.api.Pipeline; import com.jfirer.jnet.common.api.ReadProcessor; import com.jfirer.jnet.common.api.ReadProcessorNode; import lombok.extern.slf4j.Slf4j; -import org.tianhe.agent.PlatformHelper; +import org.tianhe.agent.AgentConfig; import org.tianhe.common.command.Command; -import org.tianhe.common.command.impl.ReBootJar; -import org.tianhe.common.command.impl.StopJar; -import org.tianhe.common.command.impl.UpdateJarVersion; -import org.tianhe.common.command.impl.UpdateJarVersionAndReboot; +import org.tianhe.common.command.impl.*; import org.tianhe.common.packet.ExecuteCommandReq; import org.tianhe.common.packet.FileInfoReq; import org.tianhe.common.packet.LogReport; +import org.tianhe.common.packet.ResultReq; +import org.tianhe.util.DbHelper; +import org.tianhe.util.PlatformHelper; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; @Slf4j @@ -54,6 +60,52 @@ public class CommandHandler implements ReadProcessor } case Command.UPDATE_ZIP_VERSION -> { + UpdateZipVersion updateZipVersion = (UpdateZipVersion) commandDTO; + next.pipeline().fireWrite(new LogReport().setContent(STR.format("对应用进行版本更新,应用名称:{},文件版本为:{}", updateZipVersion.getAppName(), updateZipVersion.getVersionFileDate()))); + next.pipeline().channelContext().setAttach(new FileDownload(updateZipVersion)); + next.pipeline().fireWrite(new FileInfoReq().setFileId(updateZipVersion.getFileId())); + } + case Command.FETCH_DDL -> + { + FetchDDL fetchDDL = (FetchDDL) commandDTO; + next.pipeline().fireWrite(new LogReport().setContent(STR.format("收到DDL文件拉取指令,数据库地址为:{}", fetchDDL.getUrl()))); + String ddl = DbHelper.fetchDDL(fetchDDL.getUrl(), fetchDDL.getUsername(), fetchDDL.getPassword(), next.pipeline()); + next.pipeline().fireWrite(new LogReport().setContent("数据库 DDL 获取完毕,准备回送")); + next.pipeline().fireWrite(new ResultReq().setResultId(fetchDDL.getResultId()).setCommand(Command.FETCH_DDL).setResultDTO(ddl)); + } + case Command.UPDATE_DATABASE -> + { + UpdateDatabase updateDatabase = (UpdateDatabase) commandDTO; + DbHelper.updateDatabase(updateDatabase.getUrl(), updateDatabase.getUsername(), updateDatabase.getPassword(), updateDatabase.getDdl(), next.pipeline()); + } + case Command.READ_TEXT_CONTENT -> + { + ReadTextContent readTextContent = (ReadTextContent) commandDTO; + File file = new File(AgentConfig.DIR_PATH, readTextContent.getRelativePath()); + try (FileInputStream fileInputStream = new FileInputStream(file)) + { + byte[] bytes = IoUtil.readAllBytes(fileInputStream); + next.pipeline().fireWrite(new LogReport().setContent(STR.format("读取文件:{}内容完毕", PlatformHelper.toAbsolutePath(readTextContent.getRelativePath())))); + next.pipeline().fireWrite(new ResultReq().setResultId(readTextContent.getResultId()).setCommand(Command.READ_TEXT_CONTENT).setResultDTO(new String(bytes, StandardCharsets.UTF_8))); + } + catch (IOException e) + { + next.pipeline().fireWrite(new LogReport().setContent(STR.format("读取文件:{}内容失败", PlatformHelper.toAbsolutePath(readTextContent.getRelativePath()), e))); + } + } + case Command.UPDATE_TEXT_CONTENT -> + { + UpdateTextContent updateTextContent = (UpdateTextContent) commandDTO; + File file = new File(PlatformHelper.toAbsolutePath(updateTextContent.getRelativePath())); + try (FileOutputStream outputStream = new FileOutputStream(file)) + { + outputStream.write(updateTextContent.getContent().getBytes(StandardCharsets.UTF_8)); + next.pipeline().fireWrite(new LogReport().setContent(STR.format("文件:{}更新完毕。", PlatformHelper.toAbsolutePath(updateTextContent.getRelativePath())))); + } + catch (IOException e) + { + next.pipeline().fireWrite(new LogReport().setContent(STR.format("文件:{}更新失败。", PlatformHelper.toAbsolutePath(updateTextContent.getRelativePath()), e))); + } } } } diff --git a/src/main/java/org/tianhe/agent/control/DownloadFileHandler.java b/src/main/java/org/tianhe/agent/control/DownloadFileHandler.java index d879f5f..191bd2a 100644 --- a/src/main/java/org/tianhe/agent/control/DownloadFileHandler.java +++ b/src/main/java/org/tianhe/agent/control/DownloadFileHandler.java @@ -7,21 +7,22 @@ import com.jfirer.jnet.common.api.ReadProcessor; import com.jfirer.jnet.common.api.ReadProcessorNode; import lombok.Data; import org.tianhe.agent.AgentConfig; -import org.tianhe.agent.PlatformHelper; +import org.tianhe.util.PlatformHelper; import org.tianhe.common.command.Command; import org.tianhe.common.packet.FileInfoResp; import org.tianhe.common.packet.FileSegmentReq; import org.tianhe.common.packet.FileSegmentResp; import org.tianhe.common.packet.LogReport; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; +import java.io.*; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; import static org.tianhe.common.command.Command.UPDATE_JAR_VERSION_AND_REBOOT; +import static org.tianhe.common.command.Command.UPDATE_ZIP_VERSION; @Data public class DownloadFileHandler implements ReadProcessor @@ -129,6 +130,38 @@ public class DownloadFileHandler implements ReadProcessor PlatformHelper.startJar(fileDownload.getRelativePath(), pipeline); } } + case UPDATE_ZIP_VERSION -> + { + File targetDir = new File(AgentConfig.DIR_PATH, fileDownload.getRelativePath()); + targetDir.mkdirs(); + try (FileInputStream inputStream = new FileInputStream(fileDownload.getDownloadTempFile()); ZipInputStream zipIn = new ZipInputStream(inputStream)) + { + ZipEntry entry = zipIn.getNextEntry(); + while (entry != null) + { + File zipTargetFile = new File(targetDir, entry.getName()); + if (entry.isDirectory()) + { + if (!zipTargetFile.exists()) + { + zipTargetFile.mkdirs(); + } + } + else + { + File parentFile = zipTargetFile.getParentFile(); + if (!parentFile.exists()) + { + parentFile.mkdirs(); + } + extractFile(zipIn, zipTargetFile); + } + zipIn.closeEntry(); + entry = zipIn.getNextEntry(); + } + pipeline.fireWrite(new LogReport().setContent(STR.format("文件:{}解压完成", fileDownload.getRelativePath()))); + } + } } } else @@ -173,4 +206,16 @@ public class DownloadFileHandler implements ReadProcessor } next.fireChannelClose(e); } + + private void extractFile(ZipInputStream zipIn, File file) throws IOException + { + BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file)); + byte[] bytesIn = new byte[4096]; + int read; + while ((read = zipIn.read(bytesIn)) != -1) + { + bos.write(bytesIn, 0, read); + } + bos.close(); + } } diff --git a/src/main/java/org/tianhe/agent/control/FileDownload.java b/src/main/java/org/tianhe/agent/control/FileDownload.java index 3fa6830..2268ed3 100644 --- a/src/main/java/org/tianhe/agent/control/FileDownload.java +++ b/src/main/java/org/tianhe/agent/control/FileDownload.java @@ -4,6 +4,7 @@ import lombok.Data; import lombok.experimental.Accessors; import org.tianhe.common.command.impl.UpdateJarVersion; import org.tianhe.common.command.impl.UpdateJarVersionAndReboot; +import org.tianhe.common.command.impl.UpdateZipVersion; import java.io.File; import java.io.FileOutputStream; @@ -47,6 +48,16 @@ public class FileDownload command = updateJarVersionAndReboot.commandType(); } + public FileDownload(UpdateZipVersion updateZipVersion) + { + this.relativePath = updateZipVersion.getRelativePath(); + this.appName = updateZipVersion.getAppName(); + this.versionFileDate = updateZipVersion.getVersionFileDate(); + this.fileId = updateZipVersion.getFileId(); + this.jarNameWithoutExtension = updateZipVersion.getJarNameWithoutExtension(); + command = updateZipVersion.commandType(); + } + public boolean addDownloadSize(int segmentSize) { currentDownloadedSize += segmentSize; diff --git a/src/main/java/org/tianhe/common/command/Command.java b/src/main/java/org/tianhe/common/command/Command.java index c027f5a..21041d6 100644 --- a/src/main/java/org/tianhe/common/command/Command.java +++ b/src/main/java/org/tianhe/common/command/Command.java @@ -2,11 +2,15 @@ package org.tianhe.common.command; public interface Command { - int REBOOT_JAR = 1; - int STOP_JAR = 2; - int UPDATE_JAR_VERSION = 3; - int UPDATE_JAR_VERSION_AND_REBOOT = 4; - int UPDATE_ZIP_VERSION = 5; + byte REBOOT_JAR = 1; + byte STOP_JAR = 2; + byte UPDATE_JAR_VERSION = 3; + byte UPDATE_JAR_VERSION_AND_REBOOT = 4; + byte UPDATE_ZIP_VERSION = 5; + byte FETCH_DDL = 6; + byte UPDATE_DATABASE = 7; + byte READ_TEXT_CONTENT = 8; + byte UPDATE_TEXT_CONTENT = 9; - int commandType(); + byte commandType(); } diff --git a/src/main/java/org/tianhe/common/command/impl/FetchDDL.java b/src/main/java/org/tianhe/common/command/impl/FetchDDL.java new file mode 100644 index 0000000..4467ef3 --- /dev/null +++ b/src/main/java/org/tianhe/common/command/impl/FetchDDL.java @@ -0,0 +1,21 @@ +package org.tianhe.common.command.impl; + +import lombok.Data; +import lombok.experimental.Accessors; +import org.tianhe.common.command.Command; + +@Data +@Accessors(chain = true) +public class FetchDDL implements Command +{ + private String url; + private String username; + private String password; + private String resultId; + + @Override + public byte commandType() + { + return FETCH_DDL; + } +} diff --git a/src/main/java/org/tianhe/common/command/impl/ReBootJar.java b/src/main/java/org/tianhe/common/command/impl/ReBootJar.java index 85a4ba8..37ce568 100644 --- a/src/main/java/org/tianhe/common/command/impl/ReBootJar.java +++ b/src/main/java/org/tianhe/common/command/impl/ReBootJar.java @@ -12,7 +12,7 @@ public class ReBootJar implements Command private String relativePath; @Override - public int commandType() + public byte commandType() { return REBOOT_JAR; } diff --git a/src/main/java/org/tianhe/common/command/impl/ReadTextContent.java b/src/main/java/org/tianhe/common/command/impl/ReadTextContent.java new file mode 100644 index 0000000..16903b1 --- /dev/null +++ b/src/main/java/org/tianhe/common/command/impl/ReadTextContent.java @@ -0,0 +1,19 @@ +package org.tianhe.common.command.impl; + +import lombok.Data; +import lombok.experimental.Accessors; +import org.tianhe.common.command.Command; + +@Data +@Accessors(chain = true) +public class ReadTextContent implements Command +{ + private String relativePath; + private String resultId; + + @Override + public byte commandType() + { + return READ_TEXT_CONTENT; + } +} diff --git a/src/main/java/org/tianhe/common/command/impl/StopJar.java b/src/main/java/org/tianhe/common/command/impl/StopJar.java index aa8e421..969a72d 100644 --- a/src/main/java/org/tianhe/common/command/impl/StopJar.java +++ b/src/main/java/org/tianhe/common/command/impl/StopJar.java @@ -12,7 +12,7 @@ public class StopJar implements Command private String jarFilePath; @Override - public int commandType() + public byte commandType() { return Command.STOP_JAR; } diff --git a/src/main/java/org/tianhe/common/command/impl/UpdateDatabase.java b/src/main/java/org/tianhe/common/command/impl/UpdateDatabase.java new file mode 100644 index 0000000..2b971c0 --- /dev/null +++ b/src/main/java/org/tianhe/common/command/impl/UpdateDatabase.java @@ -0,0 +1,21 @@ +package org.tianhe.common.command.impl; + +import lombok.Data; +import lombok.experimental.Accessors; +import org.tianhe.common.command.Command; + +@Data +@Accessors(chain = true) +public class UpdateDatabase implements Command +{ + private String url; + private String username; + private String password; + private String ddl; + + @Override + public byte commandType() + { + return UPDATE_DATABASE; + } +} diff --git a/src/main/java/org/tianhe/common/command/impl/UpdateJarVersion.java b/src/main/java/org/tianhe/common/command/impl/UpdateJarVersion.java index ba3c621..a3c9a77 100644 --- a/src/main/java/org/tianhe/common/command/impl/UpdateJarVersion.java +++ b/src/main/java/org/tianhe/common/command/impl/UpdateJarVersion.java @@ -15,7 +15,7 @@ public class UpdateJarVersion implements Command private String fileId; @Override - public int commandType() + public byte commandType() { return UPDATE_JAR_VERSION; } diff --git a/src/main/java/org/tianhe/common/command/impl/UpdateJarVersionAndReboot.java b/src/main/java/org/tianhe/common/command/impl/UpdateJarVersionAndReboot.java index 975290d..d84cecd 100644 --- a/src/main/java/org/tianhe/common/command/impl/UpdateJarVersionAndReboot.java +++ b/src/main/java/org/tianhe/common/command/impl/UpdateJarVersionAndReboot.java @@ -15,7 +15,7 @@ public class UpdateJarVersionAndReboot implements Command private String fileId; @Override - public int commandType() + public byte commandType() { return Command.UPDATE_JAR_VERSION_AND_REBOOT; } diff --git a/src/main/java/org/tianhe/common/command/impl/UpdateTextContent.java b/src/main/java/org/tianhe/common/command/impl/UpdateTextContent.java new file mode 100644 index 0000000..2edbc06 --- /dev/null +++ b/src/main/java/org/tianhe/common/command/impl/UpdateTextContent.java @@ -0,0 +1,19 @@ +package org.tianhe.common.command.impl; + +import lombok.Data; +import lombok.experimental.Accessors; +import org.tianhe.common.command.Command; + +@Data +@Accessors(chain = true) +public class UpdateTextContent implements Command +{ + private String relativePath; + private String content; + + @Override + public byte commandType() + { + return UPDATE_TEXT_CONTENT; + } +} diff --git a/src/main/java/org/tianhe/common/command/impl/UpdateZipVersion.java b/src/main/java/org/tianhe/common/command/impl/UpdateZipVersion.java index 477f9d6..97a0901 100644 --- a/src/main/java/org/tianhe/common/command/impl/UpdateZipVersion.java +++ b/src/main/java/org/tianhe/common/command/impl/UpdateZipVersion.java @@ -14,7 +14,7 @@ public class UpdateZipVersion implements Command private String versionFileDate; private String fileId; @Override - public int commandType() + public byte commandType() { return UPDATE_ZIP_VERSION; } diff --git a/src/main/java/org/tianhe/common/packet/PacketType.java b/src/main/java/org/tianhe/common/packet/PacketType.java index 4c0f004..9d66c8b 100644 --- a/src/main/java/org/tianhe/common/packet/PacketType.java +++ b/src/main/java/org/tianhe/common/packet/PacketType.java @@ -13,6 +13,8 @@ public enum PacketType OPEN_DATA_CHANNEL_REQ(9),// EXECUTE_COMMAND_REQ(10),// EXECUTE_COMMAND_RESP(11),// + RESULT_REQ(12),// + ; private byte command; diff --git a/src/main/java/org/tianhe/common/packet/ResultReq.java b/src/main/java/org/tianhe/common/packet/ResultReq.java new file mode 100644 index 0000000..3e6b2ad --- /dev/null +++ b/src/main/java/org/tianhe/common/packet/ResultReq.java @@ -0,0 +1,39 @@ +package org.tianhe.common.packet; + +import com.jfirer.jnet.common.buffer.buffer.IoBuffer; +import lombok.Data; +import lombok.experimental.Accessors; +import org.tianhe.util.JfireSEPool; + +@Data +@Accessors(chain = true) +public class ResultReq implements Packet +{ + private String resultId; + private byte command; + private Object resultDTO; + + @Override + public int write(IoBuffer buffer) + { + buffer.put(PacketType.RESULT_REQ.getCommand()); + buffer.put(command); + int len = writeShortString(buffer, resultId); + byte[] serialize = JfireSEPool.getJfireSE().serialize(resultDTO); + buffer.putInt(serialize.length); + buffer.put(serialize); + return 1 + 1 + 2 + len + 4 + serialize.length; + } + + @Override + public Packet read(IoBuffer buffer) + { + command = buffer.get(); + resultId = readShortString(buffer); + int len = buffer.getInt(); + byte[] bytes = new byte[len]; + buffer.get(bytes); + resultDTO = JfireSEPool.getJfireSE().deSerialize(bytes); + return this; + } +} diff --git a/src/main/java/org/tianhe/util/DbHelper.java b/src/main/java/org/tianhe/util/DbHelper.java new file mode 100644 index 0000000..3ee628f --- /dev/null +++ b/src/main/java/org/tianhe/util/DbHelper.java @@ -0,0 +1,562 @@ +package org.tianhe.util; + +import com.jfirer.baseutil.STR; +import com.jfirer.baseutil.StringUtil; +import com.jfirer.baseutil.reflect.ReflectUtil; +import com.jfirer.baseutil.time.Timewatch; +import com.jfirer.jnet.common.api.Pipeline; +import com.mysql.jdbc.Driver; +import lombok.extern.slf4j.Slf4j; +import org.tianhe.common.packet.LogReport; + +import java.sql.*; +import java.util.*; +import java.util.stream.Collectors; + +@Slf4j +public class DbHelper +{ + static + { + try + { + Class.forName(Driver.class.getName()); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + } + + private static final String COMPARE_CATALOG_NAME = "structure_compare"; + + public static String fetchDDL(String url, String username, String password, Pipeline pipeline) + { + try (Connection connection = DriverManager.getConnection(url, username, password)) + { + Set allTableNames = findAllTableNames(connection); + StringBuilder builder = new StringBuilder(); + for (String each : allTableNames) + { + try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW CREATE TABLE " + each); ResultSet resultSet = preparedStatement.executeQuery()) + { + if (resultSet.next()) + { + String createTable = resultSet.getString("create table"); + builder.append(createTable).append(";\r\n"); + } + } + } + return builder.toString(); + } + catch (SQLException e) + { + pipeline.fireWrite(new LogReport().setContent(STR.format("数据库连接失败,请检查数据库配置是否正确,url:{},username:{},password:{}。异常信息为:{}", url, username, password, e.toString()))); + return null; + } + } + + private static Set findAllTableNames(Connection connection) throws SQLException + { + String catalog = connection.getCatalog(); + try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM information_schema.tables WHERE table_schema = ?")) + { + Set tableNames = new HashSet<>(); + preparedStatement.setString(1, catalog); + try (ResultSet resultSet = preparedStatement.executeQuery()) + { + while (resultSet.next()) + { + tableNames.add(resultSet.getString("TABLE_NAME")); + } + } + return tableNames; + } + catch (Throwable e) + { + log.error("查询数据库下的表名称出现异常", e); + return new HashSet<>(); + } + } + + public static void updateDatabase(String url, String username, String password, String ddl, Pipeline pipeline) + { + List sqlCommands = filterSqlCommand(Arrays.stream(ddl.split("\n")).toList()); + try (Connection compareConnection = DriverManager.getConnection(url, username, password); Connection targetConnection = DriverManager.getConnection(url, username, password)) + { + compareConnection.setAutoCommit(false); + targetConnection.setAutoCommit(false); + createCompareStructure(compareConnection, sqlCommands, pipeline); + createUnExistTable(compareConnection, targetConnection, pipeline); + compareConnection.setCatalog(COMPARE_CATALOG_NAME); + try (Statement compareStatement = compareConnection.createStatement(); Statement targetStatement = targetConnection.createStatement()) + { + Set allTableNames = fetchAllTableNames(compareStatement); + Map structureOfCompare = fetchStructure(compareConnection, allTableNames, COMPARE_CATALOG_NAME); + Map structureOfTarget = fetchStructure(targetConnection, allTableNames, targetConnection.getCatalog()); + updateUnExistColumn(structureOfCompare, structureOfTarget, targetStatement, pipeline); + updateDifferentColumn(structureOfCompare, structureOfTarget, targetStatement, pipeline); + updateIndex(structureOfCompare, structureOfTarget, targetStatement, pipeline); + } + pipeline.fireWrite(new LogReport().setContent("数据库结构更新完毕")); + compareConnection.commit(); + targetConnection.commit(); + compareConnection.setAutoCommit(true); + targetConnection.setAutoCommit(true); + } + catch (SQLException e) + { + log.error("升级数据库结构发生异常", e); + pipeline.fireWrite(new LogReport().setContent(STR.format("升级数据库结构发生异常.", e))); + } + } + + private static List filterSqlCommand(List lines) + { + List sqlCommand = new ArrayList<>(); + boolean inNote = false; + StringBuilder builder = new StringBuilder(); + for (String each : lines) + { + if (each.trim().equals("/*")) + { + inNote = true; + } + else if (each.trim().equals("*/")) + { + inNote = false; + } + else if (inNote) + { + ; + } + else if (each.startsWith("--")) + { + ; + } + else if (StringUtil.isNotBlank(each)) + { + builder.append(each); + if (each.trim().endsWith(";")) + { + builder.setLength(builder.length() - 1); + sqlCommand.add(builder.toString()); + builder.setLength(0); + } + } + } + return sqlCommand; + } + + private static void createCompareStructure(Connection connection, List sqlCommands, Pipeline pipeline) throws SQLException + { + try (Statement statement = connection.createStatement()) + { + statement.execute("drop database if exists " + COMPARE_CATALOG_NAME); + statement.execute("CREATE database " + COMPARE_CATALOG_NAME); + } + connection.setCatalog(COMPARE_CATALOG_NAME); + try (Statement statement = connection.createStatement()) + { + sqlExecute(sqlCommands, statement, pipeline); + } + pipeline.fireWrite(new LogReport().setContent("导入 sql 语句完毕,对比数据库表创建完成.接下来准备创建缺失的表。")); + } + + private static void sqlExecute(List sqlCommand, Statement statement, Pipeline pipeline) + { + try + { + int count = 0; + int id = 1; + Timewatch timewatch = new Timewatch(); + for (String each : sqlCommand) + { + statement.addBatch(each); + if (++count > 999) + { + count = 0; + timewatch.start(); + statement.executeBatch(); + statement.clearBatch(); + timewatch.end(); + log.debug("提交1000条,执行第{}次提交,耗时:{}毫秒", id, timewatch.getTotal()); + pipeline.fireWrite(new LogReport().setContent(STR.format("提交1000条,执行第{}次提交,耗时:{}毫秒", id, timewatch.getTotal()))); + id++; + } + } + if (count != 0) + { + timewatch.start(); + statement.executeBatch(); + statement.clearBatch(); + timewatch.end(); + log.debug("提交{}条,执行第{}次提交,耗时:{}毫秒", count, id, timewatch.getTotal()); + pipeline.fireWrite(new LogReport().setContent(STR.format("提交{}条,执行第{}次提交,耗时:{}毫秒", count, id, timewatch.getTotal()))); + } + log.debug("sql执行完毕"); + pipeline.fireWrite(new LogReport().setContent("sql 执行完毕")); + } + catch (Throwable e) + { + log.error("sql执行过程中出现异常", e); + pipeline.fireWrite(new LogReport().setContent(STR.format("sql执行过程中出现异常", e))); + } + } + + private static void createUnExistTable(Connection compareConnection, Connection targetConnection, Pipeline pipeline) throws SQLException + { + compareConnection.setCatalog(COMPARE_CATALOG_NAME); + try (Statement compareStatement = compareConnection.createStatement(); Statement targetStatement = targetConnection.createStatement()) + { + Set compareTableNames = fetchAllTableNames(compareStatement); + Set targetTableNames = fetchAllTableNames(targetStatement); + Set UnExistTableNames = compareTableNames.stream().filter(name -> targetTableNames.contains(name) == false).collect(Collectors.toSet()); + if (!UnExistTableNames.isEmpty()) + { + Set createUnExistTableSql = UnExistTableNames.stream().map(name -> { + try (ResultSet resultSet = compareStatement.executeQuery("SHOW CREATE TABLE " + name)) + { + if (resultSet.next()) + { + return resultSet.getString("Create Table"); + } + else + { + throw new IllegalStateException("查询 create tables 不应该为空"); + } + } + catch (SQLException e) + { + ReflectUtil.throwException(e); + return null; + } + }).collect(Collectors.toSet()); + pipeline.fireWrite(new LogReport().setContent(STR.format("已经查询到缺失的表的建表语句,准备执行建表语句"))); + createUnExistTableSql.forEach(sql -> { + try + { + targetStatement.execute(sql); + } + catch (SQLException e) + { + log.error("创建缺失表异常,当前异常sql为:{}", sql, e); + ReflectUtil.throwException(e); + } + }); + pipeline.fireWrite(new LogReport().setContent(STR.format("缺失的表创建完成,共创建表:{}", UnExistTableNames))); + } + else + { + pipeline.fireWrite(new LogReport().setContent("本次没有需要新建的表")); + } + } + } + + private static Set fetchAllTableNames(Statement statement) throws SQLException + { + try (ResultSet resultSet = statement.executeQuery("show tables");) + { + Set set = new HashSet<>(); + while (resultSet.next()) + { + set.add(resultSet.getString(1)); + } + return set; + } + } + + private static Map fetchStructure(Connection connection, Set allTableNames, String catalog) throws SQLException + { + try (PreparedStatement queryColumn = connection.prepareStatement("SELECT COLUMN_TYPE, COLUMN_COMMENT,COLUMN_NAME,COLUMN_DEFAULT,IS_NULLABLE,EXTRA FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"); + // + PreparedStatement queryIndex = connection.prepareStatement("SELECT distinct (INDEX_NAME),NON_UNIQUE,INDEX_COMMENT FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"); + // + PreparedStatement queryIndexColumn = connection.prepareStatement("select COLUMN_NAME,SEQ_IN_INDEX FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? and INDEX_NAME=? order by SEQ_IN_INDEX")) + { + Map structureOfCatalog = new HashMap<>(); + for (String tableName : allTableNames) + { + Map columns = queryColumns(catalog, tableName, queryColumn); + List indexes = queryIndexs(catalog, tableName, queryIndex, queryIndexColumn); + structureOfCatalog.put(tableName, new TableInfo(tableName, columns, indexes)); + } + return structureOfCatalog; + } + } + + private static Map queryColumns(String catalog, String tableName, PreparedStatement queryColumn) throws SQLException + { + queryColumn.clearParameters(); + queryColumn.setString(1, catalog); + queryColumn.setString(2, tableName); + Map columns = new HashMap<>(); + try (ResultSet resultSet = queryColumn.executeQuery()) + { + while (resultSet.next()) + { + String type = resultSet.getString(1); + String comment = StringUtil.notBlankOrDefault(resultSet.getString(2), ""); + String name = resultSet.getString(3); + String defaultValue = StringUtil.notBlankOrDefault(resultSet.getString(4), ""); + boolean isNull = resultSet.getString(5).equalsIgnoreCase("YES"); + String extra = StringUtil.notBlankOrDefault(resultSet.getString(6), ""); + columns.put(name, new ColumnInfo(type, name, comment, tableName, defaultValue, isNull, extra)); + } + } + return columns; + } + + private static List queryIndexs(String catalog, String tableName, PreparedStatement queryIndex, PreparedStatement queryIndexColumn) throws SQLException + { + queryIndex.clearParameters(); + queryIndex.setString(1, catalog); + queryIndex.setString(2, tableName); + List indexes = new ArrayList<>(); + try (ResultSet resultSet = queryIndex.executeQuery()) + { + while (resultSet.next()) + { + String indexName = resultSet.getString(1); + if (indexName.equalsIgnoreCase("primary")) + { + continue; + } + boolean unique = !resultSet.getBoolean(2); + String comment = resultSet.getString(3); + queryIndexColumn.clearParameters(); + queryIndexColumn.setString(1, catalog); + queryIndexColumn.setString(2, tableName); + queryIndexColumn.setString(3, indexName); + List indexColumnInSeq = new ArrayList<>(); + try (ResultSet indexColumns = queryIndexColumn.executeQuery()) + { + while (indexColumns.next()) + { + indexColumnInSeq.add(indexColumns.getString(1)); + } + } + indexes.add(new Index(indexName, tableName, comment, unique, indexName.equalsIgnoreCase("primary"), indexColumnInSeq.toArray(String[]::new))); + } + } + return indexes; + } + + record Index(String indexName, String table, String comment, boolean unique, boolean primary, String[] columns) + { + @Override + public int hashCode() + { + return indexName.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (obj == null || obj instanceof Index == false) + { + return false; + } + Index obj1 = (Index) obj; + if (indexName.equals(obj1.indexName) && table.equals(obj1.table) && unique == obj1.unique && primary == obj1.primary && Arrays.equals(columns, obj1.columns)) + { + return true; + } + else + { + return false; + } + } + } + + record ColumnInfo(String type, String name, String comment, String table, String defaultValue, boolean isNull, String extra) + { + } + + record TableInfo(String name, Map columns, List indexes) + { + } + + private static void updateUnExistColumn(Map structureOfCompare, Map structureOfTarget, Statement targetStatement, Pipeline pipeline) + { + Set unExistColumns = structureOfCompare.values().stream().flatMap(tableInfo -> tableInfo.columns.values().stream())// + .filter(column -> !structureOfTarget.get(column.table).columns.containsKey(column.name)).collect(Collectors.toSet()); + if (unExistColumns.isEmpty()) + { + pipeline.fireWrite(new LogReport().setContent("本次没有缺失字段需要补齐")); + } + else + { + unExistColumns.forEach(columnInfo -> { + StringBuilder builder = new StringBuilder("ALTER TABLE "); + builder.append(columnInfo.table).append(" add column ").append(columnInfo.name).append(" ").append(columnInfo.type).append(" "); + if (StringUtil.isNotBlank(columnInfo.defaultValue())) + { + try + { + Double.valueOf(columnInfo.defaultValue()); + builder.append("default ").append(columnInfo.defaultValue()).append(" "); + } + catch (Throwable e) + { + builder.append("default '").append(columnInfo.defaultValue()).append("' "); + } + } + if (StringUtil.isNotBlank(columnInfo.extra())) + { + builder.append(columnInfo.extra).append(" "); + } + if (columnInfo.isNull() == false) + { + builder.append("not null "); + } + if (StringUtil.isNotBlank(columnInfo.comment)) + { + builder.append("comment '").append(columnInfo.comment).append("'"); + } + try + { + targetStatement.execute(builder.toString()); + } + catch (SQLException e) + { + log.error("字段:{}补齐字段过程中发生异常", columnInfo.table + "." + columnInfo.name, e); + } + }); + pipeline.fireWrite(new LogReport().setContent(STR.format("缺失{}个字段补齐完毕,分别是:{}", unExistColumns.size(), unExistColumns.stream().map(columnInfo -> columnInfo.table + "." + columnInfo.name).collect(Collectors.joining(","))))); + } + } + + private static void updateDifferentColumn(Map structureOfCompare, Map structureOfTarget, Statement targetStatement, Pipeline pipeline) + { + Set differentColumns = structureOfCompare.values().stream().flatMap(tableInfo -> tableInfo.columns.values().stream())// + .filter(column -> structureOfTarget.get(column.table).columns.containsKey(column.name))// + .filter(column -> !structureOfTarget.get(column.table).columns.get(column.name).comment.equals(column.comment)// + || !structureOfTarget.get(column.table).columns.get(column.name).defaultValue().equals(column.defaultValue())// + || !structureOfTarget.get(column.table).columns.get(column.name).type.equals(column.type)// + || !structureOfTarget.get(column.table).columns.get(column.name).isNull == column.isNull()// + || !structureOfTarget.get(column.table).columns.get(column.name).extra.equals(column.extra))// + .peek(columnInfo -> { + ColumnInfo columnInfo1 = structureOfTarget.get(columnInfo.table).columns.get(columnInfo.name); + })// + .collect(Collectors.toSet()); + if (differentColumns.isEmpty()) + { + pipeline.fireWrite(new LogReport().setContent("本次没有不同字段需要更新")); + } + else + { + differentColumns.forEach(differentColumn -> { + StringBuilder builder = new StringBuilder(); + builder.append("ALTER TABLE ").append(differentColumn.table).append(" modify column ").append(differentColumn.name).append(' ').append(differentColumn.type).append(' '); + if (StringUtil.isNotBlank(differentColumn.defaultValue())) + { + try + { + Double.valueOf(differentColumn.defaultValue()); + builder.append("default ").append(differentColumn.defaultValue()).append(" "); + } + catch (Throwable e) + { + builder.append("default '").append(differentColumn.defaultValue()).append("' "); + } + } + if (StringUtil.isNotBlank(differentColumn.extra())) + { + builder.append(differentColumn.extra).append(" "); + } + if (differentColumn.isNull() == false) + { + builder.append("not null "); + } + if (StringUtil.isNotBlank(differentColumn.comment)) + { + builder.append("comment '").append(differentColumn.comment).append("'"); + } + String sql = builder.toString(); + try + { + targetStatement.execute(sql); + } + catch (SQLException e) + { + log.error("执行表更新语句:{}过程中发现异常", sql); + } + }); + pipeline.fireWrite(new LogReport().setContent(STR.format("不同字段:{}个更新完毕,更新的字段有:{}", differentColumns.size(), differentColumns.stream().map(columnInfo -> columnInfo.table + "." + columnInfo.name).collect(Collectors.joining(","))))); + } + } + + private static void updateIndex(Map structureOfCompare, Map structureOfTarget, Statement targetStatement, Pipeline pipeline) + { + Set sameIndexes = structureOfTarget.values().stream().map(TableInfo::indexes).flatMap(Collection::stream)// + .filter(index -> // + structureOfCompare.get(index.table).indexes.stream().anyMatch(// + indexOfCompare -> indexOfCompare.indexName.equalsIgnoreCase(index.indexName) && indexOfCompare.unique == index.unique && indexOfCompare.comment.equals(index.comment) && Arrays.equals(indexOfCompare.columns, index.columns))).collect(Collectors.toSet()); + // + Set allTargetIndexes = structureOfTarget.values().stream().map(TableInfo::indexes).flatMap(Collection::stream).collect(Collectors.toSet()); + allTargetIndexes.removeAll(sameIndexes); + Set needDeletedIndexes = new HashSet<>(allTargetIndexes); + if (needDeletedIndexes.isEmpty()) + { + pipeline.fireWrite(new LogReport().setContent("本次没有需要删除的索引")); + } + else + { + Set error = new HashSet<>(); + for (Index needDeleteIndex : needDeletedIndexes) + { + String sql = STR.format("ALTER TABLE {} DROP INDEX {}", needDeleteIndex.table, needDeleteIndex.indexName); + try + { + targetStatement.execute(sql); + } + catch (SQLException e) + { + error.add(needDeleteIndex); + pipeline.fireWrite(new LogReport().setContent(STR.format("删除索引 {} 出现异常", needDeleteIndex.table + "." + needDeleteIndex.indexName, e))); + } + } + needDeletedIndexes.removeAll(error); + pipeline.fireWrite(new LogReport().setContent(STR.format("删除不同的索引完毕,删除的索引有:{}", needDeletedIndexes.stream().map(index -> index.table + "." + index.indexName).collect(Collectors.joining(","))))); + } + Set allCompareIndexes = structureOfCompare.values().stream().map(TableInfo::indexes).flatMap(Collection::stream).collect(Collectors.toSet()); + allCompareIndexes.removeAll(sameIndexes); + Set needCreateIndexes = new HashSet<>(allCompareIndexes); + if (needCreateIndexes.isEmpty()) + { + pipeline.fireWrite(new LogReport().setContent("没有不同的索引需要创建")); + } + else + { + Set error = new HashSet<>(); + for (Index index : needCreateIndexes) + { + StringBuilder builder = new StringBuilder(); + builder.append("alter table ").append(index.table).append(" add "); + if (index.unique) + { + builder.append("unique "); + } + builder.append("index ").append(index.indexName).append("(").append(String.join(",", index.columns)).append(") "); + if (StringUtil.isNotBlank(index.comment)) + { + builder.append("comment '").append(index.comment).append("'"); + } + String sql = builder.toString(); + try + { + targetStatement.execute(sql); + } + catch (SQLException e) + { + log.error("创建索引出现异常,sql 为:{}", sql); + error.add(index); + pipeline.fireWrite(new LogReport().setContent(STR.format("创建索引 {} 出现异常", index.table + "." + index.indexName, e))); + } + } + needCreateIndexes.removeAll(error); + pipeline.fireWrite(new LogReport().setContent(STR.format("创建不同的索引完毕,创建的索引有:{}", needCreateIndexes.stream().map(index -> index.table + "." + index.indexName).collect(Collectors.joining(","))))); + } + } +} diff --git a/src/main/java/org/tianhe/agent/PlatformHelper.java b/src/main/java/org/tianhe/util/PlatformHelper.java similarity index 96% rename from src/main/java/org/tianhe/agent/PlatformHelper.java rename to src/main/java/org/tianhe/util/PlatformHelper.java index ae7e24c..8108e31 100644 --- a/src/main/java/org/tianhe/agent/PlatformHelper.java +++ b/src/main/java/org/tianhe/util/PlatformHelper.java @@ -1,9 +1,10 @@ -package org.tianhe.agent; +package org.tianhe.util; import com.jfirer.baseutil.STR; import com.jfirer.baseutil.StringUtil; import com.jfirer.jnet.common.api.Pipeline; import lombok.extern.slf4j.Slf4j; +import org.tianhe.agent.AgentConfig; import org.tianhe.common.packet.LogReport; import java.io.BufferedReader; @@ -129,4 +130,9 @@ public class PlatformHelper } }).start(); } + + public static String toAbsolutePath(String relativePath) + { + return new File(DIR_PATH, relativePath).getAbsolutePath(); + } } diff --git a/src/main/java/org/tianhe/web/WebConfig.java b/src/main/java/org/tianhe/web/WebConfig.java index c2c778d..ffc486c 100644 --- a/src/main/java/org/tianhe/web/WebConfig.java +++ b/src/main/java/org/tianhe/web/WebConfig.java @@ -5,17 +5,19 @@ import com.jfirer.jfire.core.AwareContextInited; import com.jfirer.jfire.core.inject.notated.PropertyRead; import com.jfirer.jfire.core.prepare.annotation.ComponentScan; import com.jfirer.jfire.core.prepare.annotation.EnableAutoConfiguration; +import com.jfirer.jfire.core.prepare.annotation.configuration.Bean; +import com.jfirer.jfire.core.prepare.annotation.configuration.Configuration; import com.jfirer.jnet.common.api.Pipeline; import com.jfirer.jnet.common.decoder.TotalLengthFieldBasedFrameDecoder; import com.jfirer.jnet.common.util.ChannelConfig; import com.jfirer.jnet.server.AioServer; import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; -import org.tianhe.web.service.LogReportService; -import org.tianhe.web.control.ControlChannelStruct; -import org.tianhe.web.control.AgentReqHandler; import org.tianhe.common.PacketWriteHandler; +import org.tianhe.web.control.AgentReqHandler; +import org.tianhe.web.control.ControlChannelStruct; import org.tianhe.web.control.RegisterSelfControlChannelHandler; +import org.tianhe.web.service.LogReportService; import javax.annotation.Resource; import javax.sql.DataSource; @@ -25,17 +27,20 @@ import java.util.concurrent.ConcurrentHashMap; @EnableAutoConfiguration @Slf4j @Resource +@Configuration public class WebConfig implements AwareContextInited { @PropertyRead("datasource.user") - private String username; + private String username; @PropertyRead("datasource.password") - private String password; + private String password; @PropertyRead("datasource.url") - private String url; - @Resource - private LogReportService logReportService; + private String url; @Resource + private LogReportService logReportService; + private ConcurrentHashMap controlChannels; + + @Bean public DataSource dataSource() { HikariDataSource dataSource = new HikariDataSource(); @@ -53,7 +58,7 @@ public class WebConfig implements AwareContextInited channelConfig.setWorkerGroup(ChannelConfig.DEFAULT_WORKER_GROUP); channelConfig.setChannelGroup(ChannelConfig.DEFAULT_CHANNEL_GROUP); channelConfig.setPort(39999); - ConcurrentHashMap controlChannels = new ConcurrentHashMap<>(); + controlChannels = new ConcurrentHashMap<>(); AioServer aioServer = AioServer.newAioServer(channelConfig, channelContext -> { Pipeline pipeline = channelContext.pipeline(); pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, channelConfig.getAllocator())); diff --git a/src/main/java/org/tianhe/web/http/HttpServer.java b/src/main/java/org/tianhe/web/http/HttpServer.java new file mode 100644 index 0000000..75de7c8 --- /dev/null +++ b/src/main/java/org/tianhe/web/http/HttpServer.java @@ -0,0 +1,71 @@ +package org.tianhe.web.http; + +import com.jfirer.jfire.core.ApplicationContext; +import com.jfirer.jfire.core.AwareContextInited; +import com.jfirer.jfire.core.prepare.annotation.Import; +import com.jfirer.jfirer.boot.forward.path.PathRequestForwardProcessor; +import com.jfirer.jfirer.boot.http.NotFoundUrlProcessor; +import com.jfirer.jfirer.boot.http.OptionsProcessor; +import com.jfirer.jfirer.boot.http.ResourceProcessor; +import com.jfirer.jfirer.boot.http.ResponseDataToHttpResponse; +import com.jfirer.jnet.common.api.Pipeline; +import com.jfirer.jnet.common.buffer.allocator.BufferAllocator; +import com.jfirer.jnet.common.buffer.allocator.impl.CachedBufferAllocator; +import com.jfirer.jnet.common.buffer.allocator.impl.PooledBufferAllocator; +import com.jfirer.jnet.common.internal.DefaultWorkerGroup; +import com.jfirer.jnet.common.util.CapacityStat; +import com.jfirer.jnet.common.util.ChannelConfig; +import com.jfirer.jnet.extend.http.decode.HttpRequestDecoder; +import com.jfirer.jnet.extend.http.decode.HttpResponseEncoder; +import com.jfirer.jnet.server.AioServer; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Resource; + +@Resource +@Import(PathRequestForwardProcessor.class) +@Slf4j +public class HttpServer implements AwareContextInited +{ + public static final BufferAllocator BUFFER_ALLOCATOR = CachedBufferAllocator.DEFAULT; + + @Override + public void aware(ApplicationContext context) + { + PathRequestForwardProcessor pathRequestForwardProcessor = context.getBean(PathRequestForwardProcessor.class); + ChannelConfig channelConfig = new ChannelConfig(); + channelConfig.setAllocator(BUFFER_ALLOCATOR); + new Thread(() -> { + while (true) + { + try + { + Thread.sleep(10000); + CapacityStat stat = new CapacityStat(); + ((PooledBufferAllocator) BUFFER_ALLOCATOR).directCapacityStat(stat); + log.trace("{}", stat); + System.gc(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + }); + channelConfig.setWorkerGroup(new DefaultWorkerGroup(Runtime.getRuntime().availableProcessors() * 4, "OperationMaster-worker-")); + channelConfig.setChannelGroup(ChannelConfig.DEFAULT_CHANNEL_GROUP); + channelConfig.setPort(40001); + AioServer aioServer = AioServer.newAioServer(channelConfig, channelContext -> { + Pipeline pipeline = channelContext.pipeline(); + pipeline.addReadProcessor(new HttpRequestDecoder(channelConfig.getAllocator())); + pipeline.addReadProcessor(new OptionsProcessor()); + ResourceProcessor resourceProcessor = new ResourceProcessor("/dist"); + pipeline.addReadProcessor(resourceProcessor); + pipeline.addReadProcessor(pathRequestForwardProcessor); + pipeline.addReadProcessor(new NotFoundUrlProcessor(resourceProcessor)); + pipeline.addWriteProcessor(new ResponseDataToHttpResponse()); + pipeline.addWriteProcessor(new HttpResponseEncoder(channelConfig.getAllocator())); + }); + aioServer.start(); + } +}