开发中

master
linbin 2024-09-07 09:22:10 +08:00
parent 96eacc4b79
commit 8e0fd5bae3
10 changed files with 104 additions and 18 deletions

View File

@ -5,7 +5,9 @@ public enum CommandType
REGISTER_SELF_CONTROL_CHANNEL(1),//
REGISTER_SELF_DATA_CHANNEL(2),//
CONTROL_CHANNEL_HEART_BEAT(3),//
LOG_REPORT(4),
LOG_REPORT(4),//
FILE_INFO_REQ(5),//
FILE_INFO_RESP(6),
;
private byte command;

View File

@ -0,0 +1,31 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import org.tianhe.common.CommandType;
import java.nio.charset.StandardCharsets;
public class FileInfoReq implements Packet
{
private String fileId;
@Override
public int write(IoBuffer buffer)
{
buffer.put(CommandType.FILE_INFO_REQ.getCommand());
byte[] bytes = fileId.getBytes(StandardCharsets.UTF_8);
buffer.putShort((short) bytes.length);
buffer.put(bytes);
return 3 + bytes.length;
}
@Override
public FileInfoReq read(IoBuffer buffer)
{
short len = buffer.getShort();
byte[] content = new byte[len];
buffer.get(content);
fileId = new String(content, StandardCharsets.UTF_8);
return this;
}
}

View File

@ -0,0 +1,21 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
public class FileInfoResp implements Packet
{
private String md5;
private int size;
@Override
public int write(IoBuffer buffer)
{
return 0;
}
@Override
public Packet read(IoBuffer buffer)
{
return null;
}
}

View File

@ -1,10 +1,12 @@
package org.tianhe.common.packet;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import java.nio.charset.StandardCharsets;
@Data
public class LogReport implements Packet
{
private String date;

View File

@ -7,4 +7,9 @@ public interface Packet
int write(IoBuffer buffer);
Packet read(IoBuffer buffer);
default void writeString(IoBuffer buffer){
}
}

View File

@ -8,11 +8,13 @@ import java.time.LocalDateTime;
@Data
public class ControlChannelStruct
{
private String key;
private final Pipeline pipeline;
private volatile LocalDateTime lastActiveTime;
public ControlChannelStruct(Pipeline pipeline, LocalDateTime lastActiveTime)
public ControlChannelStruct(String key, Pipeline pipeline, LocalDateTime lastActiveTime)
{
this.key = key;
this.pipeline = pipeline;
this.lastActiveTime = lastActiveTime;
}

View File

@ -3,10 +3,12 @@ package org.tianhe.web.tcp;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import org.tianhe.common.CommandType;
import java.time.LocalDateTime;
@Data
public class HeartBeatHandler implements ReadProcessor<IoBuffer>
{
private ControlChannelStruct controlChannelStruct;
@ -18,6 +20,7 @@ public class HeartBeatHandler implements ReadProcessor<IoBuffer>
if (b == CommandType.CONTROL_CHANNEL_HEART_BEAT.getCommand())
{
controlChannelStruct.setLastActiveTime(LocalDateTime.now());
data.free();
}
else
{

View File

@ -3,20 +3,26 @@ package org.tianhe.web.tcp;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.tianhe.common.CommandType;
import org.tianhe.common.packet.LogReport;
import org.tianhe.web.service.LogReportService;
@Data
@Accessors(chain = true)
public class LogReportHandler implements ReadProcessor<IoBuffer>
{
private LogReportService logReportService;
@Override
public void read(IoBuffer data, ReadProcessorNode next)
{
if (data.get() == CommandType.LOG_REPORT.getCommand())
{
LogReport read = (LogReport) new LogReport().read(data);
logReportService.report(read.getContent(), read.getDate());
data.free();
}
else
{

View File

@ -3,6 +3,8 @@ package org.tianhe.web.tcp;
import com.jfirer.jnet.common.api.ReadProcessor;
import com.jfirer.jnet.common.api.ReadProcessorNode;
import com.jfirer.jnet.common.buffer.buffer.IoBuffer;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.common.CommandType;
import org.tianhe.common.packet.RegisterSelfControlChannel;
@ -11,17 +13,14 @@ import java.time.LocalDateTime;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Data
@Accessors(chain = true)
public class RegisterSelfControlChannelHandler implements ReadProcessor<IoBuffer>
{
private final ConcurrentHashMap<String, ControlChannelStruct> control_channels;
private boolean unDefined = true;
private final HeartBeatHandler heartBeatHandler;
public RegisterSelfControlChannelHandler(ConcurrentHashMap<String, ControlChannelStruct> controlChannels, HeartBeatHandler heartBeatHandler)
{
control_channels = controlChannels;
this.heartBeatHandler = heartBeatHandler;
}
private boolean unDefined = true;
private HeartBeatHandler heartBeatHandler;
private ConcurrentHashMap<String, ControlChannelStruct> control_channels;
private ControlChannelStruct controlChannelStruct;
@Override
public void read(IoBuffer data, ReadProcessorNode next)
@ -31,12 +30,15 @@ public class RegisterSelfControlChannelHandler implements ReadProcessor<IoBuffer
if (data.get() == CommandType.REGISTER_SELF_CONTROL_CHANNEL.getCommand())
{
unDefined = false;
RegisterSelfControlChannel from =new RegisterSelfControlChannel().read(data);
control_channels.put(from.getHosId() + "_" + from.getAgentId(), new ControlChannelStruct(next.pipeline(), LocalDateTime.now()));
RegisterSelfControlChannel from = new RegisterSelfControlChannel().read(data);
controlChannelStruct = new ControlChannelStruct(from.getHosId() + "_" + from.getAgentId(), next.pipeline(), LocalDateTime.now());
control_channels.put(controlChannelStruct.getKey(), controlChannelStruct);
data.free();
}
else
{
log.error("未能识别的初始命令,关闭连接");
data.free();
next.pipeline().channelContext().close();
}
}
@ -45,4 +47,11 @@ public class RegisterSelfControlChannelHandler implements ReadProcessor<IoBuffer
next.fireRead(data);
}
}
@Override
public void channelClose(ReadProcessorNode next, Throwable e)
{
control_channels.remove(controlChannelStruct.getKey(), controlChannelStruct);
next.fireChannelClose(e);
}
}

View File

@ -7,6 +7,7 @@ import com.jfirer.jnet.common.decoder.TotalLengthFieldBasedFrameDecoder;
import com.jfirer.jnet.common.util.ChannelConfig;
import com.jfirer.jnet.server.AioServer;
import lombok.extern.slf4j.Slf4j;
import org.tianhe.web.service.LogReportService;
import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;
@ -15,22 +16,26 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class WerbTcpServer implements AwareContextInited
{
@Resource
private LogReportService logReportService;
@Override
public void aware(ApplicationContext applicationContext)
{
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setMsOfReadTimeout(1000*30);
channelConfig.setMsOfReadTimeout(1000 * 30);
channelConfig.setWorkerGroup(ChannelConfig.DEFAULT_WORKER_GROUP);
channelConfig.setChannelGroup(ChannelConfig.DEFAULT_CHANNEL_GROUP);
channelConfig.setPort(39999);
ConcurrentHashMap<String,ControlChannelStruct> controlChannels = new ConcurrentHashMap<>();
ConcurrentHashMap<String, ControlChannelStruct> controlChannels = new ConcurrentHashMap<>();
AioServer aioServer = AioServer.newAioServer(channelConfig, channelContext -> {
Pipeline pipeline = channelContext.pipeline();
pipeline.addReadProcessor(new TotalLengthFieldBasedFrameDecoder(0, 4, 4, Integer.MAX_VALUE, channelConfig.getAllocator()));
HeartBeatHandler heartBeatHandler = new HeartBeatHandler();
RegisterSelfControlChannelHandler registerSelfControlChannelHandler = new RegisterSelfControlChannelHandler(controlChannels, heartBeatHandler);
HeartBeatHandler heartBeatHandler = new HeartBeatHandler();
RegisterSelfControlChannelHandler registerSelfControlChannelHandler = new RegisterSelfControlChannelHandler().setHeartBeatHandler(heartBeatHandler).setControl_channels(controlChannels);
pipeline.addReadProcessor(registerSelfControlChannelHandler);
pipeline.addReadProcessor(heartBeatHandler);
pipeline.addReadProcessor(new LogReportHandler().setLogReportService(logReportService));
});
aioServer.start();
log.debug("运维服务器穿透模块启动");