态势感知流程图
2019-09-18 10:35:05 1 举报
态势安全分析
作者其他创作
大纲/内容
Create_Thread_Executor()
调用launchRunningJobs() 向Pipline调用接口获取采集的服务器信息
Process execute(String excuteId)
initResource();加载核心配置项EsperUtil、SimpleEs(返回ES客户端)、TaskManagerImpl、IpSearchUtil
case STOPED
while (true)
否
initUploadHdfsTask(); 开启hdfs定时上传任务加入线程池
KafkaSink
是
run
case WAITING
sendHeartbeat();调用Pipline,注册为extract类型任务,并发送心跳包
outputGobbler.start()
taskListFromHTTP()调用Pipline拉取任务内容
startOutTask()输出任务启动 startOutTask设置对应hbase、kafka、ES配置信息
switch (action)
IpSearchUtil构造器完成类初始化配置读取
processor.process(record.value())根据不同类型调用process写入数据
getExtractionTask() 根据job id获取提取器内容
TaskManagerImpl.getInstance().addTask();
case RUNNING
logparser项目LogparserOut.java main方法
startParseTask()开启解析任务
formatExtraction()按照extract类型初始化解析解析器json文件
接口 response 是否有返回信息
activate()
采集模块
EsSink
结束
ReceiveTaskJob.execute()定时5秒执行一次
default: break;
是否有数据
清洗模块LogparserExtract单独模块用蓝色字体表示,LogparserOut单独模块用紫色字体表示,共用方法、模块黑色字体表示
initTopics();加载系统配置的topic文件
parserChannels()定义方法模板
new HDFSSink
createNewFileName()创建本地创建配置文件名称
stop
switch (command)
否 Thread.sleep(10000);
switch (collectType)
new EsSink
createConfigFile()
startKafkaStreams()
serviceExecutor.run()
balancePartitionNum()配置对应topic的partition个数
initLogback() 加载并配置本地logback
logparser项目LogparserExtract.java main方法
parseExtraction() 解析提取器regex、splitChar、json、jsonRegex
initThreadHbase()
HDFSSink
schedule()
scheduleRunningJobs()
注册成功
run() 调用Pipline进行当前任务线程心跳通知
new KafkaSink
HbaseSink
createStreamTask()
调用collectorRegister() 采集本机IP等信息向mysql注册为采集任务的服务器(后续需要北冥后台进行页面配置添加任务启动Flume进行采集数据)
根据collect字段内容来进行不同种类采集任务调度
lazyStartParseTask()
deleteTask()
bootstrap项目ApplicationShell.java main方法
case STOPPING
restart先stop再进行start
是根据collectType的值进启动不同的采集任务
\"extrat\".equals(System.getProperty(\"app_type\"))
SubmitTask.execute()
调用Pipline接口实时获取最新任务数据,若当前节点有新分配的任务,将进行新的任务调度
new HbaseSink
调用Piplineupdate-collect-task更新任务状态
updateTaskStatus执行完任务后,调用Pipline更新任务状态
InnerActuator.run()
System.exit(1);
batchQueryExtracts()调用Pipline批量查询所有任务extract信息
EsperUtil静态代码块完成对关键词计数
initThreadKafka()
initThreadHDFS()
initThreadEs()
TaskManagerImpl
收藏
收藏
0 条评论
下一页