开发中

master
linbin 2024-09-08 12:38:16 +08:00
parent abb0acfe35
commit d641525037
9 changed files with 123 additions and 40 deletions

View File

@ -20,4 +20,18 @@ public enum CommandType
{
return command;
}
public static CommandType valueOf(byte command)
{
return switch (command)
{
case 1 -> REGISTER_SELF_CONTROL_CHANNEL;
case 2 -> REGISTER_SELF_DATA_CHANNEL;
case 3 -> CONTROL_CHANNEL_HEART_BEAT;
case 4 -> LOG_REPORT;
case 5 -> FILE_INFO_REQ;
case 6 -> FILE_INFO_RESP;
default -> throw new IllegalStateException("Unexpected value: " + command);
};
}
}

View File

@ -3,8 +3,6 @@ package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import org.tianhe.common.CommandType;
import java.nio.charset.StandardCharsets;
public class FileInfoReq implements Packet
{
private String fileId;
@ -13,19 +11,13 @@ public class FileInfoReq implements Packet
public int write(IoBuffer buffer)
{
buffer.put(CommandType.FILE_INFO_REQ.getCommand());
byte[] bytes = fileId.getBytes(StandardCharsets.UTF_8);
buffer.putShort((short) bytes.length);
buffer.put(bytes);
return 3 + bytes.length;
return 3 + writeShortString(buffer, fileId);
}
@Override
public FileInfoReq read(IoBuffer buffer)
{
short len = buffer.getShort();
byte[] content = new byte[len];
buffer.get(content);
fileId = new String(content, StandardCharsets.UTF_8);
fileId = readShortString(buffer);
return this;
}
}

View File

@ -1,6 +1,7 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import org.tianhe.common.CommandType;
public class FileInfoResp implements Packet
{
@ -10,7 +11,10 @@ public class FileInfoResp implements Packet
@Override
public int write(IoBuffer buffer)
{
return 0;
buffer.put(CommandType.FILE_INFO_RESP.getCommand());
int len = writeShortString(buffer, md5);
buffer.putInt(size);
return 1 + len + 4;
}
@Override

View File

@ -4,8 +4,6 @@ import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import java.nio.charset.StandardCharsets;
@Data
public class LogReport implements Packet
{
@ -16,27 +14,16 @@ public class LogReport implements Packet
public int write(IoBuffer buffer)
{
buffer.put(CommandType.LOG_REPORT.getCommand());
byte[] dateBytes = date.getBytes(StandardCharsets.UTF_8);
buffer.putShort((short) dateBytes.length);
buffer.put(dateBytes);
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
buffer.putShort((short) contentBytes.length);
buffer.put(contentBytes);
return 1 + 2 + dateBytes.length + 4 + contentBytes.length;
int len1 = writeShortString(buffer, date);
int len2 = writeShortString(buffer, content);
return 1 + 2 + len1 + 4 + len2;
}
@Override
public Packet read(IoBuffer buffer)
{
short len = buffer.getShort();
byte[] bytes = new byte[len];
buffer.get(bytes);
date = new String(bytes, StandardCharsets.UTF_8);
int contentLen = buffer.getInt();
bytes = new byte[contentLen];
buffer.get(bytes);
content = new String(bytes, StandardCharsets.UTF_8);
date = readShortString(buffer);
content = readShortString(buffer);
return this;
}
}

View File

@ -2,14 +2,46 @@ package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import java.nio.charset.StandardCharsets;
public interface Packet
{
/**
* buffer
*
* @param buffer
* @return
*/
int write(IoBuffer buffer);
/**
*
* @param buffer
* @return
*/
Packet read(IoBuffer buffer);
default int writeShortString(IoBuffer buffer, String content)
{
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
buffer.putShort((short) bytes.length);
buffer.put(bytes);
return bytes.length;
}
default void writeString(IoBuffer buffer){
default String readShortString(IoBuffer buffer)
{
short len = buffer.getShort();
byte[] content = new byte[len];
buffer.get(content);
return new String(content, StandardCharsets.UTF_8);
}
default int writeLongString(IoBuffer buffer, String content)
{
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
buffer.putInt(bytes.length);
buffer.put(bytes);
return bytes.length;
}
}

View File

@ -4,8 +4,6 @@ import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import java.nio.charset.StandardCharsets;
@Data
public class RegisterSelfControlChannel implements Packet
{
@ -15,21 +13,16 @@ public class RegisterSelfControlChannel implements Packet
public int write(IoBuffer buffer)
{
buffer.put(CommandType.REGISTER_SELF_CONTROL_CHANNEL.getCommand());
byte[] bytes = hosId.getBytes(StandardCharsets.UTF_8);
buffer.putShort((short) bytes.length);
buffer.put(bytes);
int len = writeShortString(buffer, hosId);
buffer.put(agentId);
return 1 + 2 + bytes.length + 1;
return 1 + 2 + len + 1;
}
@Override
public RegisterSelfControlChannel read(IoBuffer buffer)
{
short len = buffer.getShort();
byte[] bytes = new byte[len];
buffer.get(bytes);
RegisterSelfControlChannel registerSelfControlChannel = new RegisterSelfControlChannel();
registerSelfControlChannel.setHosId(new String(bytes, StandardCharsets.UTF_8));
registerSelfControlChannel.setHosId(readShortString(buffer));
registerSelfControlChannel.setAgentId(buffer.get());
return this;
}

View File

@ -0,0 +1,9 @@
package org.tianhe.web.service;
import org.tianhe.common.packet.FileInfoReq;
import org.tianhe.common.packet.FileInfoResp;
public interface FileService
{
FileInfoResp find(FileInfoReq req);
}

View File

@ -0,0 +1,51 @@
package org.tianhe.web.tcp;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import org.tianhe.common.packet.FileInfoReq;
import org.tianhe.common.packet.FileInfoResp;
import org.tianhe.common.packet.LogReport;
import org.tianhe.web.service.FileService;
import org.tianhe.web.service.LogReportService;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Data
public class CommandHandler implements ReadProcessor<IoBuffer>
{
private ControlChannelStruct struct;
@Resource
private LogReportService logReportService;
@Resource
private FileService fileService;
@Override
public void read(IoBuffer buffer, ReadProcessorNode next)
{
CommandType commandType = CommandType.valueOf(buffer.get());
switch (commandType)
{
case CONTROL_CHANNEL_HEART_BEAT ->
{
struct.setLastActiveTime(LocalDateTime.now());
buffer.free();
}
case LOG_REPORT ->
{
LogReport read = (LogReport) new LogReport().read(buffer);
logReportService.report(read.getContent(), read.getDate());
buffer.free();
}
case FILE_INFO_REQ ->
{
FileInfoReq infoReq = new FileInfoReq().read(buffer);
FileInfoResp fileInfoResp = fileService.find(infoReq);
}
}
}
}

View File

@ -33,6 +33,7 @@ public class RegisterSelfControlChannelHandler implements ReadProcessor<IoBuffer
RegisterSelfControlChannel from = new RegisterSelfControlChannel().read(data);
controlChannelStruct = new ControlChannelStruct(from.getHosId() + "_" + from.getAgentId(), next.pipeline(), LocalDateTime.now());
control_channels.put(controlChannelStruct.getKey(), controlChannelStruct);
heartBeatHandler.setControlChannelStruct(controlChannelStruct);
data.free();
}
else