前文
之前写了jxwaf一些搭建部署内容,还是没有说到真正可以落地的方式(日志存储这一块),对于日志存储,无非就是这几样:普遍ES,逐渐流行clickhouse以及真香splunk,针对于这三种日志存储方式,之后也会写上相应的文章,以作参考。
这篇文章的主角是clickhouse,不熟悉clickhouse的朋友可以去看前文,了解一下。
docker-compose集成:
https://github.com/yingshang/jxwaf-docker-clickhouse.git
部署架构
clickhouse支持客户端连接(9000端口)和restful API(8123端口),那么就引出了两种传输架构。
jxwaf(tcp)->logstash(input)->logstash(fileter)->logstash(output)->kfaka->clickhouse
jxwaf(tcp)->logstash(input)->logstash(fileter)->logstash(output)->http->clickhouse
这两种经过实践测试,http方式一旦并发数量超过100,就会出现报错问题;kafka这种方式就很香了,logstash将日志丢给kafka作为生产者,clickhouse作为消费者去消费,这样的好处就是日志丢包率很低,clickhouse不容易崩溃。
部署说明
因为之前已经写过jxwaf的安装了,在这里也不想写太多,最近更新就是控制台开源了,所以也不需要使用docker去跑,其他的可以参考前文安装。由于WEB控制台已经开源,所以作者将语义识别的特征内容放在云上读取下载,后续会在web控制台提供按钮(国庆之后),截至写稿完成,我这边需要人工去调用下载,嫌麻烦就等作者更新吧。
首先,需要设置jxwaf的日志丢到logstash那里,我设置了logstash input的端口为5000
因为jxwaf丢过来的日志是json格式的,所以logstash编码类型也是json。
input { tcp { port => 5000 codec => json } }
启动logstash捕捉到日志内容如下图所示,key-value一一对应,在以前旧版的时候,headers是以json形式扩展,这样如果选择clickhouse就有一个问题,你永远不知道headers会新增什么样key-value,而clickhouse是定义好字段内容,这样就无法弹性扩展,所以在最新版里面直接将headers头变成字符串类型。
{ "bytes_received" => "453", "server_uuid" => "cda8b94b-f328-45f9-87c8-f970522bec3f", "@version" => "1", "protection_info" => "block", "method" => "GET", "host" => "test.com", "client_ip" => "172.20.0.1", "user_agent" => "Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0", "query_string" => "id=1%27%20union%20select%201,2,3,4,5%20union%20all", "uri" => "/index.php", "request_time" => "2020-09-26 08:19:28", "connections_active" => "1", "upstream_bytes_sent" => "-", "port" => 33362, "log_type" => "owasp_attack", "scheme" => "http", "request_process_time" => "0.000", "body" => "", "upstream_addr" => "-", "@timestamp" => 2020-09-26T08:19:28.354Z, "status" => "403", "upstream_bytes_received" => "-", "bytes_sent" => "320", "uuid" => "6618b4d1-950a-47a9-937d-ec1543976624", "connections_waiting" => "0", "version" => "1.1", "upstream_status" => "-", "upstream_response_time" => "-", "protection_type" => "jxwaf-sql_check", "raw_header" => "Host: test.com\r\nUser-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\nAccept-Language: en-US,en;q=0.5\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\nCookie: PHPSESSID=abauuaistng65l8hecafuu2l54; showhints=1\r\nUpgrade-Insecure-Requests: 1\r\nCache-Control: max-age=0\r\n\r\n" }
来到logstash的filter阶段,对于input过来的日志,有些key值我不想要,有些key值我想要改一下名字,那么就在这个阶段内处理。
将request_time参数重命名timestamp用于clickhouse数据库的时间戳,然后删掉@timestamp和@version无用参数名。
filter { mutate { rename => {"[request_time]" => "timestamp"} remove_field =>["@timestamp","@version"] } }
实现效果如下图。
既然已经知道了参数名,就可以去定义clickhouse数据库的表结构,我采用的MergeTree(合并树)结构,以timestamp作为分区。
create table jxwaf( timestamp DateTime, bytes_received String NULL, server_uuid String NULL, protection_info String NULL, method String NULL, host String NULL, client_ip String NULL, user_agent String NULL, query_string String NULL, uri String NULL, connections_active String NULL, upstream_bytes_sent String NULL, port Int8 NULL, log_type String NULL, scheme String NULL, request_process_time String NULL, body String NULL, upstream_addr String NULL, status String NULL, upstream_bytes_received String NULL, bytes_sent String NULL, uuid String NULL, connections_waiting String NULL, version String NULL, upstream_status String NULL, upstream_response_time String NULL, protection_type String NULL, raw_header String NULL )ENGINE MergeTree() PARTITION BY toYYYYMMDD(timestamp) ORDER BY (ti
来源:freebuf.com 2020-09-16 22:22:02 by: 陌度
请登录后发表评论
注册