ELK之Logstash安装与配置及使用

1、Logstash介绍

Logstash 是开源的服务器端数据处理管道,能够同时 从多个来源采集数据、转换数据,然后将数据发送到您最喜欢的 “存储库” 中。(我们的存储库当然是 Elasticsearch。)

2、安装jdk

# yum -y install java-1.8.0  
# java -version  
java version "1.8.0_51"  
Java(TM) SE Runtime Environment (build 1.8.0_51-b16)  
Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)  

3、安装logstash

# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.0.0.tar.gz  
# tar zxf logstash-6.0.0.tar.gz -C /Data/apps/  

配置logstash的环境变量

# echo "export PATH=$PATH:/Data/apps/logstash-6.0.0/bin" > /etc/profile.d/logstash.sh  
# . /etc/profile  

4、查看帮助

# logstash --help

未分类

5、logstash常用参数

-e :指定logstash的配置信息,可以用于快速测试;
-f :指定logstash的配置文件;可以用于生产环境;

6、启动logstash

6.1 通过-e参数指定logstash的配置信息,用于快速测试,直接输出到屏幕。–quiet:日志输出安静模式

$ logstash -e "input {stdin{}} output {stdout{}}" --quiet

6.2

$ logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

7、logstash以配置文件方式启动

$ vim logstash.conf  
input { stdin {} }  
output {  
   stdout { codec=> rubydebug }  
}  
$ logstash -f logstash.conf --quie  
yes ,i can  
{  
      "@version" => "1",  
          "host" => "wechat1-dev.bj1.xxxx.net",  
    "@timestamp" => 2017-11-25T10:28:38.763Z,  
       "message" => "yes ,i can"  
}  

8、更多样例

请参考官方文档样例:https://www.elastic.co/guide/en/logstash/current/config-examples.html

8.1 样例 elasticsearch

input { stdin { } }  
output {  
  elasticsearch { hosts => ["localhost:9200"] }  
  stdout { codec => rubydebug }  
}  

8.2 样例 access_log

input {  
  file {  
    path => "/tmp/access_log"  
    start_position => "beginning"  
  }  
}  


filter {  
  if [path] =~ "access" {  
    mutate { replace => { "type" => "apache_access" } }  
    grok {  
      match => { "message" => "%{COMBINEDAPACHELOG}" }  
    }  
  }  
  date {  
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]  
  }  
}  


output {  
  elasticsearch {  
    hosts => ["localhost:9200"]  
  }  
  stdout { codec => rubydebug }  
}  

8.3 写入redis

input { stdin { } }  
output {  
    stdout { codec => rubydebug }  
    redis {  
        host => '192.168.1.104'  
        data_type => 'list'  
        key => 'logstash:redis'  
    }  
}  

logstash笔记(二)——grok之match

官方文档:

https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

基本语法:

%{SYNTAX:SEMANTIC}

SYNTAX:定义的正则表达式名字(系统插件自带的默认位置:$HOME/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns)

SEMANTIC:匹配结果的标识

grok{
  match=>{
    "message"=>"%{IP:clientip}"
  }
}

输入结果

{
  "message" => "192.168.1.1 abc",
  "@version" => "1",
  "@timestamp" => "2016-03-30T02:15:31.242Z",
  "host" => "master",
  "clientip" => "192.168.1.1"
}

clientip就是semantic

每个%{IP:clientip}表达式只能匹配到message中第一次出现的结果,可用如下方式匹配多个相同类型结果

%{IP:clientip}s+%{IP:clientip1}…,如果SEMANTIC定义的相同名字,结果为数组形式,如:

{
  "message" => "12.12.12.12 32.32.32.32",
  "@version" => "1",
  "@timestamp" => "2016-03-30T02:26:31.077Z",
  "host" => "master",
  "clientip" => [
    [0] "12.12.12.12",
    [1] "32.32.32.32"
  ]
}

自定义grok表达式

语法:(?the pattern here)

eg:

grok{
  match=>{
    "message"=>"%{IP:clientip}s+(?<mypattern>[A-Z]+)"
  }
}

rs:

{
  "message" => "12.12.12.12 ABC",
  "@version" => "1",
  "@timestamp" => "2016-03-30T03:22:04.466Z",
  "host" => "master",
  "clientip" => "12.12.12.12",
  "mypattern" => "ABC"
}

创建自定义grok文件

在/home/hadoop/mylogstash/mypatterns_dir创建文件mypatterns_file,内容如下:

MY_PATTERN [A-Z]+

保存!

修改filter

grok{
  patterns_dir=>["/home/hadoop/mylogstash/mypatterns_dir"]
  match=>{
    "message"=>"%{IP:clientip}s+%{MY_PATTERN:mypattern}"
  }
}

结果同上。

logstash解析naxsi日志的问题

目前在用naxsi防火墙,使用elk来做一个日志分析,遇到问题如下:

naxsi作为waf会产生error日志,目前我打开了NAXSI_EXLOG日志选项,因为这个选项可以看到具体的请求内容。

对于同一个请求,naxsi会产生2行或者3行的日志,格式如下:

2017/10/23 17:45:36 [error] 744#0: *19 NAXSI_EXLOG: ip=192.168.141.232&server=192.168.182.141&uri=/sqli-labs/Less-11/&id=1009&zone=BODY&var_name=passwd&content=admin'%20or%20'1'='1'%20xxxxxxxxxx, client: 192.168.141.232, server: _, request: "POST /sqli-labs/Less-11/ HTTP/1.1", host: "192.168.182.141:8000", referrer: "1.1.1.1"
2017/10/23 17:45:36 [error] 744#0: *19 NAXSI_EXLOG: ip=192.168.141.232&server=192.168.182.141&uri=/sqli-labs/Less-11/&id=1013&zone=BODY&var_name=passwd&content=admin'%20or%20'1'='1'%20xxxxxxxxxx, client: 192.168.141.232, server: _, request: "POST /sqli-labs/Less-11/ HTTP/1.1", host: "192.168.182.141:8000", referrer: "1.1.1.1"
2017/10/23 17:45:36 [error] 744#0: *19 NAXSI_FMT: ip=192.168.141.232&server=192.168.182.141&uri=/sqli-labs/Less-11/&learning=0&vers=0.55.3&total_processed=4&total_blocked=4&block=1&cscore0=$SQL&score0=22&cscore1=$XSS&score1=40&zone0=BODY&id0=1009&var_name0=passwd&zone1=BODY&id1=1013&var_name1=passwd, client: 192.168.141.232, server: _, request: "POST /sqli-labs/Less-11/ HTTP/1.1", host: "192.168.182.141:8000", referrer: "1.1.1.1"

这是同一个请求产生的结果,因为每次请求都会有一个id值在里面,这个是19:

未分类

问题:如何取出NAXSI_EXLOG里面的content,跟NAXSI_FMT里面的结果合并到一起?

我写的logstash和正则如下:

DA1 d{4}/d{2}/d{2}
TM1 d{2}:d{2}:d{2}
LEVEL (w+)
NUM1 d+(?:#0: *)
NUM2 d+
EXLOG NAXSI_EXLOG
FMT NAXSI_FMT
ID1 (d+)
ZONE w+
VAR1  (.*)
CONTENT (.*)
T3 w+
T4 HTTP/1.1", host: "(.*)", referrer: "
HOST (.*)

NAXSI %{DA1:date1}s%{TM1:time}s[%{LEVEL:level}]s%{NUM1:num1}%{NUM2:num2}s(?<logtype>NAXSI_EXLOG):sw+=%{HOST:client_ip}&server=%{HOST:hostname}&uri=%{PROG:filepath}&id=%{ID1:id}&zone=%{ZONE:zone}&var_name=%{VAR1:var}&content=%{CONTENT:content},sclient:s%{HOST:ip3},sserver:s(.*)srequest:s"%{T3:method}s%{HOST:uri}sHTTP/1.1",shost:s"%{HOST:host22}"

NAXSI2 %{DA1:date1}s%{TM1:time}s[%{LEVEL:level}]s%{NUM1:num1}%{NUM2:num2}s(?<logtype>NAXSI_EXLOG):sw+=%{HOST:client_ip}&server=%{HOST:hostname}&uri=%{PROG:filepath}&id=%{ID1:id}&zone=%{ZONE:zone}&var_name=%{VAR1:var}&content=%{CONTENT:content},sclient:s%{HOST:ip3},sserver:s(.*)srequest:s"%{T3:method}s%{HOST:uri}sHTTP/1.1",shost:s"%{HOST:host22}",sreferrer:s"(?<referrer>(.*))

FMT %{DA1:date1}s%{TM1:time}s[%{LEVEL:level}]s%{NUM1:num1}%{NUM2:num2}s(?<logtype>NAXSI_FMT):sip=%{HOST:ip}&server=%{HOST:server}&uri=%{UNIXPATH:uri}&learning=%{HOST:learing}&vers=%{HOST:vers}&total_processed=%{HOST:toal_processed}&total_blocked=%{HOST:blocked}&block=%{HOST:block}&cscore0=%{HOST:attack}&score0=%{HOST:score0}&cscore1=%{HOST:xss}&score1=%{HOST:score}&zone0=%{WORD:args}&id0=%{NUMBER:id}&var_name0=%{HOST:varname},sclient:s%{HOST:ip3},sserver:s(.*)srequest:s"%{T3:method}s%{HOST:uri}sHTTP/1.1",shost:s"%{HOST:host22}

logstash.conf:

input {
 file {
       path => "/usr/local/nginx/logs/naxsi.err"
       type => "naxsi-error"
       start_position => "beginning"
   }
   }
   filter {
    if [type] == "naxsi-error" {
    grok {
        patterns_dir => "/opt/logstash-5.5.1/pattern"
        match => [ "message" , "%{NAXSI2}",
               "message" , "%{NAXSI}",
               "message" , "%{FMT}"
            ]

    }
    # aggregate {
    #   task_id => "%{num2}"
    #       code => "map['sql_duration'] = 0"
    #   end_of_task => true
    #   }

}  }
output {
  if [type] == "naxsi-error" {
    elasticsearch {
       hosts => ["localhost"]
       index => "nxapi"
           document_id => "%{num2}"
        }
     }
}

Logstash的容器化与Ansible多环境下单配置文件发布

原因

为了发布、迁移方便,最近决定将公司项目中用的logstash容器化,最初原打算沿用原多进程方案,在容器内通过supervisor启动多个不同配置进程,但使用官方容器时并支持启动多个实例。

最终解决方式:将集群的配置文件组装在一起,不同的path对应不同的filter与output;由于不同主机上组件不同,日志也不同。因此在ansible发布时通过脚本检测生成log file 的path列表,适配集群上的不同组件。这样的好处是只需要维护一份配置,操作简单。

官方logstash镜像

在dockerhub上搜索了到一个即将被废弃的镜像:https://hub.docker.com/_/logstash/。
最新版镜像在es官方进行维护: https://www.elastic.co/guide/en/logstash/current/docker.html
由于直接使用原版镜像时映射到容器内部的pipeline.conf一直提示No permission,明明都已经设置成755,宿主机上并没有logstash这一用户,chown不方便。所以决定对官方镜像稍加修改,再推送到公司内的dockerhub私有docker registry上。

FROM docker.elastic.co/logstash/logstash:5.6.1
MAINTAINER CodingCrush
ENV LANG en_US.UTF-8
# Disable ES monitoring in the official image
ENV XPACK_MONITORING_ENABLED false
# TimeZone: Asia/Shanghai
ENV TZ Asia/Shanghai
ENV PIPELINE_WORKERS 5
# Default user is logstash
USER root

这个Dockerfile很简单,主要设置了User为root,解决conf文件的权限不足问题。另外关闭容器自带的es xpack检查功能,设置时区与pipeline的 worker数量。
然后打个tag,推到私有docker registry上。

docker build -t dockerhub.xxx.net/logstash:5.6.1
docker push dockerhub.xxx.net/logstash:5.6.1

容器便修改好了,在生产集群上可pull。

pipeline.conf

这个简化的配置文件中path应该是一个列表,但目前用了一个占位字符串,发布时动态检测通过sed进行更换,实际项目中的组件配置更多,这里写4个意思一下。
值得注意的是: 通过if else 对path进行匹配时如果出现/,需要进行转义,否则ruby进行正则匹配时会报错。

input {
    file {
        path => LOG_FILES_PLACEHOLDER
        type => "nginx"
        start_position => "beginning"
        sincedb_path => "/data/projects/logstash/data/logstash.db"
        codec=>plain{charset=>"UTF-8"}
    }
}
filter {
    if [path] =~ "component1/logs/access" {
        grok & mutate
    } else if [path] =~ "component1/logs/error" {
        grok & mutate
    } else if [path] =~ "component2/logs/access" {
        grok & mutate
    } else if [path] =~ "component2/logs/error" {
        grok & mutate
    }
}
output {
    if [path] =~ "component1/logs/access" {
        stdout{codec=>rubydebug}
    }  else if [path] =~ "component1/logs/error" {
        stdout{codec=>rubydebug}
    } else if [path] =~ "component2/logs/access.log" {
        stdout{codec=>rubydebug}
    } else if [path] =~ "component2/logs/error" {
        stdout{codec=>rubydebug}
    }
}

日志文件检测脚本 detect_logfiles.py

检测脚本同样很简单,用os.path.exists进行过滤。往标准输出打一个列表的string。
此处注意,需要用re.escape进行转义,因为后面还需要用sed进行替换。

import os
import sys
import re
paths = [
    "/data/projects/component1/logs/access.log*",
    "/data/projects/component1/logs/error.log",
    "/data/projects/component2/logs/access.log*",
    "/data/projects/component2/logs/error.log*",
    ......
]
available_paths = [path for path in paths if os.path.exists(path.replace("*", ""))]
sys.stdout.write(re.escape(str(available_paths)))

ansible剧本

在发布时通过sed替换占位串,之所以大费周章搞这么个方式,主要因为我们环境比较复杂,将来横向拓展时组件布局不定,多配置文件维护起来挺麻烦。
为什么用shell?而不用ansible的module? 因为我不会,还懒得去学ansible发明的DSL

- name: Modify lostash.conf
  args:
    chdir: "/data/projects/logstash/"
  shell: 'sed -i "s|LOG_FILES_PLACEHOLDER|$(python detect_logfiles.py)|" logstash.conf'
  become: true
  become_method: sudo
- name: Modify docker-compose.yml
  args:
    chdir: "/data/projects/logstash/"
  shell: 'sed -i "s|HOST_PLACEHOLDER|{{ host }}|" docker-compose.yml'
  become: true
  become_method: sudo

docker-compose

挂载整个文件卷,HOST_PLACEHOLDER占位符是因为我希望通过container名字直观看到组件所在的主机名,而避免生产服务器上误操作。

version: '2'
services:
  logstash:
    image: dockerhub.xxx.net/logstash:5.6.1
    volumes:
       - /etc/localtime:/etc/localtime:ro
       - /data/projects/logstash/hosts:/etc/hosts
       - /data/projects:/data/projects
    container_name: HOST_PLACEHOLDER.logstash
    command: logstash -f /data/projects/logstash/logstash.conf --path.data=/data/projects/logstash/data

启动的command时指定工作目录,将运行状态持久化到外部存储上,方便迁移。

Logstash匹配request_time 字段错误解决方法

区配错误原因

匹配规则:

%{IPORHOST:client_ip} (%{WORD:ident}|-) (%{WORD:auth}|-) [%{HTTPDATE:timestamp}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response_status} (?:%{NUMBER:response_bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent} %{QS:xforwardedfor} %{NUMBER:request_time} %{NUMBER:upstream_response_time}

当日志中 upstream_response_time 或 request_time 字段出现 – 字符,而不是数字时,以上匹配规则会出现匹配错误, 改进如下:

%{IPORHOST:client_ip} (%{WORD:ident}|-) (%{WORD:auth}|-) [%{HTTPDATE:timestamp}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response_status} (?:%{NUMBER:response_bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent} %{QS:xforwardedfor} (%{NUMBER:request_time}|-) (%{NUMBER:upstream_response_time}|-)

以上两条规则不同之处:

%{NUMBER:request_time} %{NUMBER:upstream_response_time}

更换为:

(%{NUMBER:request_time}|-) (%{NUMBER:upstream_response_time}|-)

但是,还是这样匹配到的字段会有 – 字符,而es中使用的字段类型是数字类型,所以需要以下解决办法:

解决方法

filter {
    if [upstream_response_time] == "-" {
            mutate {
                    replace => { "upstream_response_time" => 0 }
            }
        }
    if [request_time] == "-" {
            mutate {
                    replace => { "request_time" => 0 }
            }
    }
}

Logstash input模块详解

Logstash由三个组件构造成,分别是input、filter以及output。我们可以吧Logstash三个组件的工作流理解为:input收集数据,filter处理数据,output输出数据。至于怎么收集、去哪收集、怎么处理、处理什么、怎么发生以及发送到哪等等一些列的问题就是我们接下啦要讨论的一个重点。

我们今天先讨论input组件的功能和基本插件。前面我们意见介绍过了,input组件是Logstash的眼睛和鼻子,负责收集数据的,那么们就不得不思考两个问题,第一个问题要清楚的就是,元数据在哪,当然,这就包含了元数据是什么类型,属于什么业务;第二个问题要清楚怎么去拿到元数据。只要搞明白了这两个问题,那么Logstash的input组件就算是弄明白了。

对于第一个问题,元数据的类型有很多,比如说你的元数据可以是日志、报表、可以是数据库的内容等等。元数据是什么样子的我们不需要关心,我们要关系的是元数据是什么类型的,只要你知道元数据是什么类型的,你才能给他分类,或者说给他一个type,这很重要,type对于你后面的工作处理是非常有帮助的。所以第一个问题的重心元数据在吗,是什么,现在已经是清楚了。那么进行第二个问题。

第二个问题的核心是怎么拿到这些不同类型的原数据?这是一个真个input组件的核心内容了,我们分门别类的来看待这和解决个问题。
首先,我们肯定需要认同的,什么样的数据源,就需要使用什么样的方式去获取数据。

我们列举几种:

1、文件类型:文件类型,顾名思义,文件数据源,我们可以使用input组件的file插件来获取数据。file{}插件有很多的属性参数,我们可以张开讲解一下。具体内容在下面的代码中展示:

input{
    file{
        #path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
        path => [‘pathA’,‘pathB’]
        #表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
        discover_interval => 15
        #排除那些文件,也就是不去读取那些文件
        exclude => [‘fileName1’,‘fileNmae2’]
        #被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
        close_older => 3600
        #在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
        ignore_older => 86400
        #logstash 每隔多 久检查一次被监听文件状态( 是否有更新) , 默认是 1 秒。
        stat_interval => 1
        #sincedb记录数据上一次的读取位置的一个index
        sincedb_path => ’$HOME/. sincedb‘
        #logstash 从什么 位置开始读取文件数据, 默认是结束位置 也可以设置为:beginning 从头开始
        start_position => ‘beginning’
        #注意:这里需要提醒大家的是,如果你需要每次都从同开始读取文件的话,关设置start_position => beginning是没有用的,你可以选择sincedb_path 定义为 /dev/null
    }           

}

2、数据库类型:数据库类型的数据源,就意味着我们需要去和数据库打交道了是吗?是的!那是必须的啊,不然怎么获取数据呢。input组件如何获取数据库类的数据呢?没错,下面即将隆重登场的是input组件的JDBC插件jdbc{}。同样的,jdbc{}有很多的属性,我们在下面的代码中作出说明;

input{
    jdbc{
    #jdbc sql server 驱动,各个数据库都有对应的驱动,需自己下载
    jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar"
    #jdbc class 不同数据库有不同的 class 配置
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    #配置数据库连接 ip 和端口,以及数据库   
    jdbc_connection_string => "jdbc:sqlserver://200.200.0.18:1433;databaseName=test_db"
    #配置数据库用户名
    jdbc_user =>   
    #配置数据库密码
    jdbc_password =>
    #上面这些都不重要,要是这些都看不懂的话,你的老板估计要考虑换人了。重要的是接下来的内容。
    # 定时器 多久执行一次SQL,默认是一分钟
    # schedule => 分 时 天 月 年  
    # schedule => * 22  *  *  * 表示每天22点执行一次
    schedule => "* * * * *"
    #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false
    #是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
    #此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
    use_column_value => true
    #如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
    tracking_column => create_time
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    #们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值
    last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info"
    #是否将字段名称转小写。
    #这里有个小的提示,如果你这前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
    #因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
    lowercase_column_names => false
    #你的SQL的位置,当然,你的SQL也可以直接写在这里。
    #statement => SELECT * FROM tabeName t WHERE  t.creat_time > :last_sql_value
    statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"
    #数据类型,标明你属于那一方势力。单了ES哪里好给你安排不同的山头。
    type => "my_info"
    }
    #注意:外载的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,
    #如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。
}

好了,废话不多说了,接着第三种情况:

input {
  beats {
    #接受数据端口
    port => 5044
    #数据类型
    type => "logs"
  }
  #这个插件需要和filebeat进行配很这里不做多讲,到时候结合起来一起介绍。
}

现在我们基本清楚的知道了input组件需要做的事情和如何去做,当然他还有很多的插件可以进行数据的收集,比如说TCP这类的,还有可以对数据进行encode,这些感兴趣的朋友可以自己去查看,我说的只是我自己使用的。一般情况下我说的三种插件已经足够了。

今天的ELK种的Logstash的input组件就到这。后面还会讲述Logstash的另外另个组件filter和output。