docker部署ELK(logstash、elasticsearch、kibana),监控日志

由于是首次部署,第一次想着是单独部署logstash、elasticsearch、kibana,然后通过配置实现日志的监控,以下为部署步骤,但是最终失败,只能采取docker-compose来部署,以下内容可以略过,仅作为参考。

一、每个单独部署

先部署elasticsearch,因为logstash要设置日志输出位置,而输出位置正是elasticsearch,所以需要先部署启动elasticsearch,logstash才能部署并启动成功。

1、elasticsearch部署

1.制作elasticsearch镜像

docker pull docker.elastic.co/elasticsearch/elasticsearch:6.5.4

2.创建并启动elasticsearch容器

docker run -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” -d docker.elastic.co/elasticsearch/elasticsearch:6.5.4

通过本地浏览器访问localhost:9200,返回如下内容,则证明elasticsearch部署成功

未分类

官方参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

2、logstash部署

1.制作logstash镜像

docker pull docker.elastic.co/logstash/logstash:6.5.4

2.创建并启动logstash容器

docker run --rm -it -p 4560:4560 -v /home/xijie/app/mylogstash/pipeline/:/usr/share/logstash/pipeline/ -d docker.elastic.co/logstash/logstash:6.5.4

注意文件pipeline的映射,在pipeline中需要创建logstash的配置文件logstash-springboot.conf,该文件内容如下:

input {
    tcp {
        port => 4560
        codec => json_lines
    }
}
output{
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

注意output中设置了elasticsearch的连接地址:localhost:9200

官方参考文章:https://www.elastic.co/guide/en/logstash/current/docker-config.html

3、kibana部署

1.制作镜像

docker pull docker.elastic.co/kibana/kibana:6.5.4

2.创建并启动容器

docker run -p 5601:5601 --mount type=bind,src=/home/xijie/app/mykibana/kibana.yml,dst=/usr/share/kibana/config/kibana.yml -d mysql/mysql-server:5.7

二、docker-compose一起部署

参考文章:https://www.jianshu.com/p/c2f6e80b2756

1、第一步在docker上安装ELK

1.创建目录

mkdir /home/xijie/app/myelk

2.从github上拉取部署elk所需资料

$ git clone https://github.com/deviantony/docker-elk.git

下载完毕的资料目录如下:

未分类

3.进入刚下载的文件夹内

$ cd docker-elk

4.通过docker-compose创建并启动容器

$ docker-compose up -d

5.这个时候通过docker ps可以看到logstash、elasticsearch、kibana容器已经创建并且启动。

未分类

可以看该elk容器的默认端口为:

  • 5000: Logstash TCP input.
  • 9200: Elasticsearch HTTP
  • 9300: Elasticsearch TCP transport
  • 5601: Kibana

Kibana的web入口:http://localhost:5601

6.接下来进行参数配置,来实现springboot通过ELK查看日志信息。

修改logstash的配置,

进入logstash配置文件所在目录:

cd /home/xijie/app/myelk/dokcer-elk/logstash/pipeline

打开配置文件:

vim logstash.conf

修改内容如下:

input{
        tcp {
                mode => "server"
                port => 5000
                codec => json_lines
                tags => ["data-http"]
        }
}
filter{
    json{
        source => "message"
        remove_field => ["message"]
    }
}
output{
    if "data-http" in [tags]{
        elasticsearch{
                hosts=> ["elasticsearch:9200"]
                index => "data-http-%{+YYYY.MM.dd}"
                }
        stdout{codec => rubydebug}
    }
}

注:

input标签为logstash进数据接口,filter标签为数据过滤器,output为数据出去接口。

input标签使用的是tcp,说明springboot客户端需要将日志传递到该接口,该接口正是logstash服务器接口。

filter将message字段去掉,只是为了当展示springboot的http请求接口的数据更加规整,而不是全部展示在message字段中。

output标签将数据传递给了elasticsearch,这里使用了if,当判断所出数据为所指定tag,才进行下面的配置。特别要注意index的配置,该值在kibana中需要使用,这里指定的index值为:data-http,要注意该值与tag是没有关系的,要注意区分。

未分类

参考文章:https://www.elastic.co/guide/en/logstash/current/index.html

这个时候重启elk即可。

进入docker-elk目录

/home/xijie/app/myelk/docker-elk

然后重启

docker-compose restart

关于elasticsearch、logstash、kibana的配置都在对应目录下的config文件夹中的.yml文件中,只需要修改该文件即可。

2、springboot日志系统配置logstash

1.pom中配置logstash

<dependency>
        <groupId>net.logstash.logback</groupId>
         <artifactId>logstash-logback-encoder</artifactId>
         <version>5.2</version>
</dependency>

2.application.properties文件中配置如下:

#logstash日志收集地址,即logstash服务器地址
logstash.ip_port=172.168.0.165:5000
#日志保存级别
logging.all.level=info
#日志保存地址,该值与logstash没关系,是当日志存在本地File文件内的文件夹地址
logging.levelfile=/home/logs/data-center-service

3.logback.xml文件内容如下,该文件在resources目录

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
    <property resource="application.properties"></property>
    <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
    <property name="LOG_HOME" value="${logging.levelfile}" />
    <property name="LOG_LEVEL" value="${logging.all.level}" />
    <!-- 控制台输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50}:%L - %msg  %n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>
    <!-- 按照每天生成日志文件 -->
    <appender name="FILE"  class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--日志文件输出的文件名-->
            <FileNamePattern>${LOG_HOME}/data-center-service.log.%d{yyyy-MM-dd}.log</FileNamePattern>
            <!--日志文件保留天数-->
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <!--日志文件最大的大小-->
        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <MaxFileSize>100MB</MaxFileSize>
        </triggeringPolicy>
    </appender>
    <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>${logstash.ip_port}</destination>
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" />
        <queueSize>1048576</queueSize>
        <keepAliveDuration>5 minutes</keepAliveDuration>
        <!--<customFields>{"application-name":"data-repo-interface"}</customFields>-->
        <!--<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
        <filter class="ch.qos.logback.core.filter.EvaluatorFilter">
            <evaluator> <!– 默认为 ch.qos.logback.classic.boolex.JaninoEventEvaluator –>
                <expression>return message.contains("billing");</expression>
            </evaluator>
            <OnMatch>ACCEPT</OnMatch>
            <OnMismatch>DENY</OnMismatch>
        </filter>-->
    </appender>

    <logger name="elk_logger" level="INFO" additivity="false">
        <appender-ref ref="logstash"/>
    </logger>

    <!--<logger name="com.zaxxer" level="${LOG_LEVEL}"/>-->
    <!--<logger name="org.apache.ibatis" level="${LOG_LEVEL}"/>-->
    <!--<logger name="org.mybatis.spring" level="${LOG_LEVEL}"/>-->
    <!--<logger name="org.springframework" level="${LOG_LEVEL}"/>-->
    <!--<logger name="java.sql.Connection" level="${LOG_LEVEL}"/>-->
    <!--<logger name="java.sql.Statement" level="${LOG_LEVEL}"/>-->
    <!--<logger name="java.sql.PreparedStatement" level="${LOG_LEVEL}"/>-->

    <!-- 日志输出级别 -->
    <root level="${LOG_LEVEL}">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="FILE" />
        <!--<appender-ref ref="logstash" />-->
    </root>
</configuration>

在上述配置文件中

<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>${logstash.ip_port}</destination>
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" />
        <queueSize>1048576</queueSize>
        <keepAliveDuration>5 minutes</keepAliveDuration>
        <!--<customFields>{"application-name":"data-repo-interface"}</customFields>-->
        <!--<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
        <filter class="ch.qos.logback.core.filter.EvaluatorFilter">
            <evaluator> <!– 默认为 ch.qos.logback.classic.boolex.JaninoEventEvaluator –>
                <expression>return message.contains("billing");</expression>
            </evaluator>
            <OnMatch>ACCEPT</OnMatch>
            <OnMismatch>DENY</OnMismatch>
        </filter>-->
    </appender>

    <logger name="elk_logger" level="INFO" additivity="false">
        <appender-ref ref="logstash"/>
    </logger>

是logstash的主要配置。注意该配置中name=”elk_logger”,这样的指定,即

Logger elkLogger = LoggerFactory.getLogger("elk_logger");

当使用slf4j生成Logger时,只要指定Tag为“elk_logger”的日志输出或打印,都会上传到logstash服务器,并存储到elasticsearch,用kibana查看。

3、配置kibana,现在只要服务器通过指定的Tag打印日志,日志信息将会上传logstash解析,并且存储到elasticsearch,然后只需要kibana配置对应的elasticsearch的index即可看到所需的日志信息。

通过浏览器访问kibana,http://localhost:5601

由于本人是通过虚拟机部署的服务,而且虚拟机的ip为172.168.0.165,所以在宿主机中通过访问http://172.168.0.165:5601即可。

未分类

然后点击左侧的management模块。

未分类

接下来点击Index Patterns,创建index pattern

未分类

接下来填入信息,在index pattern框中填入上述创建ElK时logstash配置文件中的index信息“data-http-*”

未分类

填写的同时,下面会显示出elasticsearch中已经有该index得数据,(注意:在配置kibana时,应该先运行服务器,让服务区打印出对应的index日志,elasticsearch中也保存了该日志,这时才能配置kibana成功)。

然后点击右侧的next step。

未分类

在Time Filter field name列表中选择@timestamp,然后按下Create index pattern按钮及创建成功。

这时点击左侧的Discover模块,选中data-http-*标签即可。在右侧就显示出了日志信息。如下图:

未分类

这样整个ELK与日志系统就搭建完毕了。

4、接下讲一下我们工程中关于http接口日志的配置

由于我们提供的服务器接口是上游,是给其他部门服务的,这里会牵扯到大量的专业数据,而为了避免数据问题的纠纷与接口错误问题的排查,所以需要将特定的接口请求数据保存,这样可以通过resquest与response来排查到底是哪个部门的问题。

1.使用了AOP对每次请求进行日志拦截。

定义SysLogAspect类,该类内容如下:

package com;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.SpringContextUtil;
import com.LoggerEntity;
import com.MailService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Field;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Aspect
@Component
@Slf4j
public class SysLogAspect {

    Logger elkLogger = LoggerFactory.getLogger("elk_logger");


    /**
     * 开始时间
     */
    private long startTime = 0L;
    /**
     * 结束时间
     */
    private long endTime = 0L;

//  @Autowired
//  private ILogService logService;

    public static Map<String, Object> getKeyAndValue(Object obj) {
        Map<String, Object> map = new HashMap<>();
        // 得到类对象
        Class userCla = (Class) obj.getClass();
        /* 得到类中的所有属性集合 */
        Field[] fs = userCla.getFields();
        for (int i = 0; i < fs.length; i++) {
            Field f = fs[i];
            f.setAccessible(true); // 设置些属性是可以访问的
            Object val = new Object();
            try {
                val = f.get(obj);
                // 得到此属性的值
                map.put(f.getName(), val);// 设置键值
            } catch (IllegalArgumentException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }

        }
        return map;
    }

    //通过该注解来判断该接口请求是否进行cut
    @Pointcut("@annotation(com.hongdaoai.datastore.common.annotation.SysLog)")
    public void cutController() {
    }

    @Before("cutController()")
    public void doBeforeInServiceLayer(JoinPoint joinPoint) {
        log.debug("doBeforeInServiceLayer");
        startTime = System.currentTimeMillis();
    }

    @After("cutController()")
    public void doAfterInServiceLayer(JoinPoint joinPoint) {
        log.debug("doAfterInServiceLayer");
    }

    @Around("cutController()")
    public Object recordSysLog(ProceedingJoinPoint joinPoint) throws Throwable {

        RequestAttributes ra = RequestContextHolder.getRequestAttributes();

        ServletRequestAttributes sra = (ServletRequestAttributes) ra;

        HttpServletRequest request = sra.getRequest();

        HttpServletResponse response = sra.getResponse();

        //ELK日志实体类
        LoggerEntity elkLog = new LoggerEntity();

        //应用程序名称
        elkLog.setApplicationName(SpringContextUtil.getApplicationName());

        //profile.active
        elkLog.setProfileActive(SpringContextUtil.getActiveProfile());

        // 请求的类名
        elkLog.setClassName(joinPoint.getTarget().getClass().getName());

        // 请求的方法名
        elkLog.setMethodName(joinPoint.getSignature().getName());

        //请求完整地址
        elkLog.setUrl(request.getRequestURL().toString());

        //请求URI
        elkLog.setUri(request.getRequestURI());

        //请求类型
        elkLog.setRequestMethod(request.getMethod());

        String queryString = request.getQueryString();

        Object[] args = joinPoint.getArgs();

        String params = "";

        //获取请求参数集合并进行遍历拼接
        if (args.length > 0) {

            if ("POST".equals(request.getMethod())) {

                //param
                Object object = args[0];

                Map<String, Object> map = new HashMap<>();

                Map paramMap = getKeyAndValue(object);

                map.put("param", paramMap);

                if (args.length > 1) {

                    object = args[1];

                    Map bodyMap = getKeyAndValue(object);

                    map.put("body", bodyMap);
                }

                params = JSON.toJSONStringWithDateFormat(map, "yyyy-MM-dd HH:mm:ss", SerializerFeature.UseSingleQuotes);

            } else if ("GET".equals(request.getMethod())) {

                params = queryString;

            }
        }

        //请求参数内容(param)
        elkLog.setRequestParamData(params);

        //客户端IP
        elkLog.setClientIp(request.getRemoteAddr());

        //终端请求方式,普通请求,ajax请求
        elkLog.setRequestType(request.getHeader("X-Requested-With"));

        //sessionId
        elkLog.setSessionId(request.getRequestedSessionId());

        //请求时间
//        elkLog.setRequestDateTime(new Date(startTime));
        elkLog.setRequestDateTime(new Date());

        Object result = null;

        try {

            // 环绕通知 ProceedingJoinPoint执行proceed方法的作用是让目标方法执行,这也是环绕通知和前置、后置通知方法的一个最大区别。
            result = joinPoint.proceed();

        } catch (Exception e) {

            endTime = System.currentTimeMillis();

            //请求耗时(单位:毫秒)
            elkLog.setSpentTime(endTime - startTime);

            //接口返回时间
            elkLog.setResponseDateTime(new Date(endTime));

            //请求时httpStatusCode代码
            elkLog.setHttpStatusCode(String.valueOf(response.getStatus()));

            String elkLogData = JSON.toJSONStringWithDateFormat(elkLog, "yyyy-MM-dd HH:mm:ss.SSS");

            elkLogger.error(elkLogData);

//            log.error(e.getMessage(), e);

            throw e;

//            return result;

        }

        endTime = System.currentTimeMillis();

        //请求耗时(单位:毫秒)
        elkLog.setSpentTime(endTime - startTime);

        if (!elkLog.getMethodName().equals("getBookOriginalContent")) {

            //接口返回数据
            elkLog.setResponseData(JSON.toJSONStringWithDateFormat(result, "yyyy-MM-dd HH:mm:ss", SerializerFeature.UseSingleQuotes));

        }

        //接口返回时间
        elkLog.setResponseDateTime(new Date(endTime));

        //请求时httpStatusCode代码
        elkLog.setHttpStatusCode(String.valueOf(response.getStatus()));

//        if (SpringContextUtil.getActiveProfile().equals("prod")) {
        String s = JSON.toJSONStringWithDateFormat(elkLog, "yyyy-MM-dd HH:mm:ss.SSS");
        elkLogger.info(s);

        return result;
    }


}

SpringContextUtil类内容如下:

package com;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Locale;

@Component
public class SpringContextUtil implements ApplicationContextAware {

    private static ApplicationContext context = null;

    // 传入线程中
    public static <T> T getBean(String beanName) {
        return (T) context.getBean(beanName);
    }

    // 国际化使用
    public static String getMessage(String key) {
        return context.getMessage(key, null, Locale.getDefault());
    }

    /// 获取应用程序名称
    public static String getApplicationName() {
        return context.getEnvironment().getProperty("spring.application.name");
    }

    /// 获取当前环境
    public static String getActiveProfile() {
        return context.getEnvironment().getActiveProfiles()[0];
    }

    /* (non Javadoc)
     * @Title: setApplicationContext
     * @Description: spring获取bean工具类
     * @param applicationContext
     * @throws BeansException
     * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        this.context = applicationContext;
    }
}

LoggerEntity类内容如下:

package com;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

//@Entity
//@Table(name = "t_logger_infos")
@Data
public class LoggerEntity implements Serializable {

    //应用程序名
    private String applicationName;

    //spring.profiles.active
    private String profileActive;

    //客户端请求ip
    private String clientIp;

    //客户端请求路径
    private String uri;

    //客户端请求完整路径
    private String url;

    //请求方法名
    private String methodName;

    //请求类名
    private String className;

    //终端请求方式,普通请求,ajax请求
    private String requestType;

    //请求方式method,post,get等
    private String requestMethod;

    //请求参数内容,json
    private String requestParamData;

    //请求body参数内容,json
    private String requestBodyData;

    //请求接口唯一session标识
    private String sessionId;

    //请求时间
    private Date requestDateTime;

    //接口返回时间
    private Date responseDateTime;

    //接口返回数据json
    private String responseData;

    //请求时httpStatusCode代码,如:200,400,404等
    private String httpStatusCode;

    //请求耗时秒单位
    private long spentTime;

}

SysLog注解类如下:

package com;

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SysLog {

    boolean isLog() default true;

}

在SysLogAspect切面类中使用@SysLog注解来判断该接口是否进行日志传递。

所以在我们写的接口中只要在方法上加入该注解,即可控制日志是否上传。

如该Controller中所定义接口:

@SysLog(isLog = true)
@RequestMapping(value = "/getDataList", method = RequestMethod.POST)
public ApiResult getInputDataList(@RequestBody VersionVo vo) {
    return success();
}

ELK搭建环境之配置logstash监听文件并利用grok插件进行切割利于统计

一、配置ELK环境在logstash和elasticsearch之间建立通信,并能查看对应的索引、日志信息

1.本地下载logstash6.4.2运行包,不会装请移步 (ElasticSearch+LogStash+Kibana)ELK搭建在Mac/Linux系统上6.4.2版本

2.解压并进行基础配置,配置如下:

默认配置文件在安装目录 logstash6.4.2/config/logstash-sample.conf

# 监听本地目录下的test.log日志文件
input {
  file {
      path => ["/Users/chenhailong/daily-test/test.log"]
  }
}
# 配置日志发送的目标elasticsearch服务
output {
  elasticsearch {
    hosts => ["http://xxxx.xx.xxx.xx:9200"]
    index => "grok-test"
    user => "username"
    password => "password"
  }
}

3.确保elasticsearch服务是可以访问的,(开启代理、翻墙、防火墙、IP不通可能造成无法通信)

在浏览器键入对应地址:http://xxxx.xx.xxx.xx:9200

显示如下:

{
  "name" : "wqbVDEq",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "lq8YkTH4Q7eslqvd0pn9Kg",
  "version" : {
    "number" : "6.4.2",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "04711c2",
    "build_date" : "2018-09-26T13:34:09.098244Z",
    "build_snapshot" : false,
    "lucene_version" : "7.4.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

4.在客户端保证有效的日志文件,日志记录有序的生成。

客户端的

日志文件为:/Users/chenhailong/daily-test/test.log

日志格式为:2018-12-31,16:12:1546245470 INFO sdklog &event_id=10003&uuid=90b7786d-a905-48aa-93e9-bc1cf64ffce2&event_type=2&device_id=37037&session_id=1545899195566_37037&uid=2D9CBA4701227157E050840AF2422B52&date=2018-12-27&time=16:26:36&ts=1545899196409&

在需要测试时,执行脚本追加日志记录务必保证日志文件中有新的追加记录,确保监听到了新的数据。否则,没有新数据可能看不到效果

5.启动logstash

#指定配置文件启动

./bin/logstash -f ../conf/logstash-sample.conf

6.如果一切顺利的话,可以在kibana控制台看到相关信息

a. kibana 》系统管理 》 Index Mangement 》 可以搜索查看到对应的 “grok-test” 索引名称
b. kibana 》系统管理 》 Index Patterns 》可以创建索引前缀
c. kibana 》发现 》 可以搜索到 “grok-test”索引文件下的所有日志记录,如下

未分类

二:对日志进行切割

1.前期了解:

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

2.在kibana后台 》开发工具 》 Grok Debugger进行测试,如下图

不会用,请移步 使用kibana的开发工具console做一些elasticsearch的基本查询和设置

未分类

3.日志切割请参考,注意格式

%{TIMESTAMP_ISO8601:date} %{LOGLEVEL:level} %{WORD:type} &event_id=%{NUMBER:event_id}&uuid=%{USERNAME:uuid}&event_type=%{INT:event_type}&device_id=%{INT:device_id}&session_id=%{USERNAME:session_id}&uid=%{WORD:uid}&date=%{USERNAME:ymd}&time=%{TIME:time}&ts=%{NUMBER:ts}&url=%{URI:url}

4.修改配置文件,增加grok插件

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  file {
      path => ["/Users/chenhailong/daily-test/test.log"]
  }
}

filter {
  #定义数据的格式
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:date} %{LOGLEVEL:level} %{WORD:type} &event_id=%{NUMBER:event_id}&uuid=%{USERNAME:uuid}&event_type=%{INT:event_type}&device_id=%{INT:device_id}&session_id=%{USERNAME:session_id}&uid=%{WORD:uid}&date=%{USERNAME:ymd}&time=%{TIME:time}&ts=%{NUMBER:ts}&url=%{URI:url}"}
  }
}

output {
  elasticsearch {
    hosts => ["http://xxx.xx.xxx.xx:9200"]
    index => "grok-test"
    user => "username"
    password => "password"
  }
}

5.打开kibana 》 发现 》 搜索grok-test索引

会发现日志已经按照设定的格式进行切切割。这样就可以进行数据的报表分析,更加直观化。更有效的利用日志数据。

不局限于日常日志,也可以针对性的记录相关业务数据。

未分类

6.配置文件中还可以增加多种插件配合处理

具体可以移步查看官方文件,更加详细,更加专业

https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

至此:ELK的学习使用就告一段落,有需要再去细致研究。

Linux搭建ELK日志收集系统:FIlebeat+Redis+Logstash+Elasticse

Centos7部署ELK日志收集系统

一、ELK概述

ELK是一组开源软件的简称,其包括Elasticsearch、Logstash 和 Kibana。ELK最近几年发展迅速,已经成为目前最流行的集中式日志解决方案。

  • Elasticsearch: 能对大容量的数据进行接近实时的存储,搜索和分析操作。 本项目中主要通过Elasticsearch存储所有获取的日志。
  • Logstash: 数据收集引擎,它支持动态的的从各种数据源获取数据,并对数据进行过滤,分析,丰富,统一格式等操作,然后存储到用户指定的位置。
  • Kibana: 数据分析与可视化平台,对Elasticsearch存储的数据进行可视化分析,通过表格的形式展现出来。
  • Filebeat: 轻量级的开源日志文件数据搜集器。通常在需要采集数据的客户端安装Filebeat,并指定目录与日志格式,Filebeat就能快速收集数据,并发送给logstash进行解析,或是直接发给Elasticsearch存储。
  • Redis:NoSQL数据库(key-value),也数据轻型消息队列,不仅可以对高并发日志进行削峰还可以对整个架构进行解耦

传统ELK的经典框架

未分类

单一的架构,logstash作为日志搜集器,从数据源采集数据,并对数据进行过滤,格式化处理,然后交由Elasticsearch存储,kibana对日志进行可视化处理。

新型ELK框架

未分类

Filebeats是一种轻量级的日志搜集器,其不占用系统资源,自出现之后,迅速更新了原有的elk架构。Filebeats将收集到的数据发送给Logstash解析过滤,在Filebeats与Logstash传输数据的过程中,为了安全性,可以通过ssl认证来加强安全性。之后将其发送到Elasticsearch存储,并由kibana可视化分析。

二、新型ELK搭建详细过程

实验环境:

未分类

下面是搭建过程中所需程序安装包:
https://pan.baidu.com/s/1w02WtUAqh9yX4TChyMLa5Q 密码:g0p9

1.客户端部署filebeat:

yum -y install filebeat
#查看配置文件所在位置
rpm -qc filebeat

2.修改配置文件使filebeat获取的日志进入redis:

注:此处演示获取spring cloud框架中eureka日志,其他程序日志都可相同方法获取

vim /etc/filebeat/filebeat.yml
#修改的内容有一家几个字段
enabled:true
paths:程序日志路径
output.redis:日志输出地方
                    hosts:redis所在服务器IP
                    port:redis端口
                    key:redis中的key

未分类

3.源码安装redis:

解压redis程序包:

tar zxf redis-3.2.9.tar.gz –C /usr/local/src

编译redis:

cd /usr/local/src/redis-3.2.9
make && make install
ln –s /usr/local/src/redis-3.2.9 /usr/local/redis

注:redis安装时有的缺少语言环境会出错,有的会出现奇奇怪怪的问题,只要复制Error到往上搜索下就能轻易解决,在此不多做解释

修改redis配置文件:

vim /usr/local/redis/redis.conf
#修改内容如下:
daemonize yes                           #开启后台运行
timeout 120                                #超时时间
bind 0.0.0.0                                #任何地址IP都可以登录redis
protected-mode no                     #关闭redis保护机制否则在没有密码校验情况下redis远程登录失败

注:此处是做演示,如果是线上部署elk建议开启持久化机制,保证数据不丢失

4.登录测试redis是否可以正常写入数据:

未分类

5.启动filebeat看看redis是否能接收到数据:

启动filebeat:

systemctl start filebeat

6.进入redis查看是否有数据:

#执行命令:
keys *                          #查看所有key,此操作为慢查询,若redis跑了大量线上业务请不要进行此操做
lrange eureka-log 0 -1 #查询key所有数据,若filebeat启动时间过长请勿进行此操作

未分类

7.安装jdk1.8:

解压jdk安装包并创建软连接:

tar zxf /usr/local/src/jdk-8u131-linux-x64.tar.gz –C /usr/local/
ln -s /usr/local/jdk1.8.0_91/ /usr/local/jdk

配置环境变量:

vim /etc/profile
#修改内容如下:
JAVA_HOME=/usr/local/jdk
export JRE_HOME=/usr/local/jdk/jre
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

重新载入环境变量:

source /etc/profile

查看jdk是否安装成功:

java -version

未分类

8.安装Elasticsearch:

解压安装包并改名:

unzip elasticsearch-5.6.3.zip -d /usr/local/
mv /usr/local/ elasticsearch-5.6.3 /usr/local/elasticsearh

修改ES配置文件:

vim /usr/local/elasticsearch/config/elasticsearch.yml

#这里指定的是集群名称,需要修改为对应的,开启了自发现功能后,ES会按照此集群名称进行集群发现
cluster.name: my-application
node.name: node-1

#目录需要手动创建
path.data: /opt/elk/data
path.logs: /opt/elk/logs

#ES监听地址任意IP都可访问
network.host: 0.0.0.0
http.port: 9200

#若是集群,可在里面引号中添加,逗号隔开
discovery.zen.ping.unicast.hosts: [“192.168.3.205”]

# enable cors,保证_site类的插件可以访问es    
http.cors.enabled: true             #手动添加
http.cors.allow-origin: “*”         #手动添加

# Centos6不支持SecComp,而ES5.2.0默认bootstrap.system_call_filter为true进行检测,所以导致检测失败,失败后直接导致ES不能启动
bootstrap.memory_lock: false        #手动添加
bootstrap.system_call_filter: false     #手动添加

注:ES启动的时候回占用特别大的资源所以需要修改下系统参数,若不修改资源启动会异常退出

9.修改系统参数:

vim /etc/sysctl.conf
#添加参数
vm.max_map_count=655360

重新载入配置:

sysctl –p

10.修改资源参数:

vim /etc/security/limits.conf
#修改

*   soft    nofile  65536
*   hard        nofile  131072  
*   soft        nproc   65536
*   hard        nproc   131072 

如:

未分类

11.设置用户资源参数:

vim /etc/security/limits.d/20-nproc.conf
#添加
elk     soft    nproc       65536

12.创建用户并赋权:

useradd elk
groupadd elk
useradd elk -g elk

13.创建数据和日志目录并修改目录权限:

mkdir –pv /opt/elk/{data,logs}
chown –R elk:elk /opt/elk
chown –R elk:elk /usr/local/elasticsearch

14.切换用户并后台启动ES:(elk用户修改了资源参数,如不切位elk用户启动会暴毙)

su elk
nohup /opt/app/elasticsearch-5.6.3/bin/elasticsearch >> /dev/null 2>&1 &

15.查看ES状况:

方法一、
curl 'http://[ES IP]:9200/_search?pretty'

方法二、
#网页访问:
http://[ES IP]:9200/_search?pretty

16.安装logstash:

解压并创建软连接:

tar /usr/local/src/logstash-5.3.1.tar.gz –C /usr/local/
ln –s /usr/local/logstash-5.3.1 /usr/local/logstash

测试logstash是否可用:

/usr/local/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }'

未分类

在此创建主配文件进行测试:

vim /usr/local/logstash/config/logstash-simple.conf
#内容如下:
input { stdin { } }
output {
    stdout { codec=> rubydebug }
}

使用logstash参数-f读取配置文件进行测试:

/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash-simple.conf

未分类

此时说明我们的logstash是完全没有问题了,可以进行日志收集了

17.创建配置文件获取redis日志的数据:

配置文件如下:

vim /usr/local/logstash/config/redis-spring.conf 
input {
  redis {
    port => "6379"
    host => "192.168.3.205"
    data_type => "list"
    type => "log"
    key => "eureka-log"
  }
}
output {
  elasticsearch {
     hosts => "192.168.3.205:9200"
     index => "logstash1-%{+YYYY.MM.dd}"
  }
}

通过配置文件启动服务查看效果:

/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/redis-spring.conf

结果如下:

未分类

此时我们再去查看reids中key:(此时已经没有数据了,数据已经被logstash取完)

未分类

18.使用curl 查看ES是否接受到数据

curl http://192.168.3.205:9200/_search?pretty

结果如下:

未分类

此时说明我们logstash从redis中取数据,在把数据推到ES中是ok的!

19.安装ES插件:(elasticsearch-head)

注:head安装需要从国外网站拉去东西,可能网速过慢导致安装失败(可以多试几次),下面有几种方法安装:

方法一、
导入node-v8.2.1.tar.gz phantomjs-2.1.1-linux-x86_64.tar.bz2 安装包
安装node:
tar zxvf node-v8.2.1.tar.gz
cd node-v8.2.1/
./configure && make && make install 

安装phantomjs:
tar jxvf phantomjs-2.1.1-linux-x86_64.tar.bz2
cd phantomjs-2.1.1-linux-x86_64/bin/
cp phantomjs /usr/local/bin/

导入es-head程序包并解压:
unzip master.zip –d /usr/local/
cd elasticsearch-head/
npm install
npm run start &

查看端口状态:(端口默认9100)
netstat –anpt | grep 9100

方法二、
git clone git://github.com/mobz/elasticsearch-head.git
cd elasticsearch-head
npm install
npm run start
netstat –anpt | grep 9100

方法三、
拉镜像:
docker push mobz/elasticsearch-head:5
启动镜像:
docker run -p 9100:9100 mobz/elasticsearch-head:5
web访问测试:
http://IP:9100

20.Elasticsearch-head安装成功Web访问结果如下:

未分类

查看刚刚从logstash推到ES中的数据:

未分类

21.安装kibana

解压并安装kibana:

tar -zxvf /usr/local/src/kibana-5.3.1-linux-x86_64.tar.gz -C /usr/local/

修改kibana配置文件:

vim /usr/local/kibana-5.3.1-linux-x86_64/config/kibana.yml

修改内容如下:

server.port: 5601                                                            #开启默认端口5601
server.host: “192.168.3.205”                                    #kibana站点IP
elasticsearch.url: http://192.168.3.205:9200        #只想ES服务所在IP Port
kibana.index: “.kibana”

后台启动kibana:

nohup /usr/local/kibana-5.3.1-linux-x86_64/bin/kibana >> /dev/null 2>&1 &

查看端口监听:

netstat –anot | grep 5601

结果如:(此结果表示kibana启动成功)

未分类

使用Web访问kibana:

http://[Kibana IP]:5601

初次访问结果如:(刚访问的时候没有创建索引所以没有看不到数据)

未分类

根据logstash配置文件中index设置索引:
首先查看logstash中的index:

未分类

Kibana中创建index:

未分类

下面按照1,2,3,4顺序进行设置:

未分类

此时我们在返回Discover在里面我们就可以看到数据了:

未分类

至此我们的ELK就安装OK了。

docker部署logstash

logstash

使用ElasticSearch,需要将MySQL内的数据同步到ElasticSearch中去。根据网上文章,觉得logstash属于比较好的同步工具。

不想被logstash环境的搭建与配置困扰。使用docker制作一个镜像,然后可以做到到处运行

Dockerfile

基础镜像:

选择的是dockerhub的logstash。文档地址logstash

镜像文件:

FROM logstash:5

#安装input插件
RUN logstash-plugin install logstash-input-jdbc
#安装output插件
RUN logstash-plugin install logstash-output-elasticsearch
#容器启动时执行的命令.(CMD 能够被 docker run 后面跟的命令行参数替换)
CMD ["-f", "/some/config-dir/logstash-mysql-es.conf"]

build镜像:

docker build -t my-logstash .

条件准备:

创建config目录,创建配置文件logstash-mysql-es.conf
同步MySQL需要MySQL驱动。为了挂载目录的时候,只挂载一个目录。我们把mysql驱动与配置文件放到同一个目录下。

未分类

启动容器:

docker run -d --name logstashmysql -v /root/logstash/config:/some/config-dir/ bc8551a7b495

查看日志:

docker logs -f --tail=30 logstashmysql

Logstash采集网站的访问日志

最近又重新接触了一下elastisearch、logstash、kibana,蛮好用的一个日志框架。

同时好久没有更新网站内容、也没怎么关注,虽然有cnzz(umeng)的日志统计功能,但是毕竟是很小一段时间的。要是能够把日志都导出来,就可以用ELK来分析一下自己网站一年来文章的访问情况。

嗯,前阵子买了阿里云的一个VPN服务器,正好可以利用利用。把访问的日志情况通过http发送给logstash,然后存储下来,等过一段时间我们再回来分析分析这些日志。^^

启动Logstash收集服务

  • https://www.elastic.co/blog/introducing-logstash-input-http-plugin
  • https://discuss.elastic.co/t/post-data-to-logstash-using-http-input/69166/8
  • https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html
~/logstash-6.1.2/bin/logstash -e '
input { 
  http { 
    port => 20000 
    response_headers => {
      "Access-Control-Allow-Origin" => "*"
      "Content-Type" => "application/json"
      "Access-Control-Allow-Headers" => "Origin, X-Requested-With, Content-Type, Accept"
    }
  } 
} 
filter {
  if [message] =~ /^s*$/ {
    drop { }
  }

  json {
    source => "message"
  }
  json {
    source => "location"
    target => "location"
  }
  mutate {
    remove_field => [ "headers" ]
  }
}
output { 
  file { 
    path => "winse-accesslog-%{+YYYY-MM-dd}.log"
    codec => json_lines 
  } 
} 
'

页面发送访问日志记录

$.ajax({
  type: "POST",
  url: "http://SERVER:PORT",
  data: JSON.stringify({
    title: document.title,
    location: JSON.stringify(location),
    referrer: document.referrer,
    userAgent: navigator.userAgent
  }),
  contentType: "application/json; charset=utf-8",
  dataType: "json"
});

–END

CICD之logstash服务的Dockerfile使用Gitlab Runner打docker包

gitlab提交代码后,经gitlab Runner打docker包,推送到docker仓库,然后kubernetes选择版本更新

Dockerfile

FROM openjdk:8-jre-alpine

# ensure logstash user exists
RUN addgroup -S logstash && adduser -S -G logstash logstash

# install plugin dependencies
RUN apk add --no-cache 
# env: can't execute 'bash': No such file or directory
        bash 
        libc6-compat 
        libzmq

# grab su-exec for easy step-down from root
RUN apk add --no-cache 'su-exec>=0.2'

# https://www.elastic.co/guide/en/logstash/5.0/installing-logstash.html#_apt
# https://artifacts.elastic.co/GPG-KEY-elasticsearch
ENV LOGSTASH_PATH /usr/share/logstash/bin
ENV PATH $LOGSTASH_PATH:$PATH

# LOGSTASH_TARBALL="https://artifacts.elastic.co/downloads/logstash/logstash-5.5.0.tar.gz"

COPY logstash-5.5.0.tar.gz /logstash.tar.gz
RUN set -ex; 
    apk add --no-cache --virtual .fetch-deps 
        ca-certificates 
        gnupg 
        openssl 
        tar ; 
    dir="$(dirname "$LOGSTASH_PATH")"; 
    mkdir -p "$dir"; 
    tar -xf /logstash.tar.gz --strip-components=1 -C "$dir"; 
    rm logstash.tar.gz; 
    apk del .fetch-deps; 
    export LS_SETTINGS_DIR="$dir/config"; 
# if the "log4j2.properties" file exists (logstash 5.x), let's empty it out so we get the default: "logging only errors to the console"
    if [ -f "$LS_SETTINGS_DIR/log4j2.properties" ]; then 
        cp "$LS_SETTINGS_DIR/log4j2.properties" "$LS_SETTINGS_DIR/log4j2.properties.dist"; 
        truncate -s 0 "$LS_SETTINGS_DIR/log4j2.properties"; 
    fi; 
# set up some file permissions
    for userDir in 
        "$dir/config" 
        "$dir/data" 
    ; do 
        if [ -d "$userDir" ]; then 
            chown -R logstash:logstash "$userDir"; 
        fi; 
    done; 
    logstash --version

COPY docker-entrypoint.sh /
RUN chmod +x /docker-entrypoint.sh
COPY logstash-shipper.conf /
RUN mkdir -p /data/logs/sincedb
RUN chown logstash.logstash -R /data/logs/sincedb
WORKDIR /
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["-f", "/logstash-shipper.conf"]

docker-entrypoint.sh

#!/bin/bash
set -e
mkdir -p /data/logs/sincedb
chown logstash.logstash -R /data/logs/sincedb

# first arg is `-f` or `--some-option`
if [ "${1#-}" != "$1" ]; then
    set -- logstash "$@"
fi

# Run as user "logstash" if the command is "logstash"
# allow the container to be started with `--user`
if [ "$1" = 'logstash' -a "$(id -u)" = '0' ]; then
    set -- su-exec logstash "$@"
fi

exec "$@"

logstash-5.5.0.tar.gz 从官方下载 https://www.elastic.co/cn/downloads/logstash

logstash-shipper.conf样例

input {
    file {
        path => [ "/data/logs/service/*/*.log"]
        type => "service"
        sincedb_path => "/data/logs/sincedb/service"
        codec => multiline {
            pattern => "^dddd-dd-dd dd:dd:dd.ddd .+"
            negate => true
            what => "previous"
            max_lines => 30
        }       
    }
    file {
        path => [ "/data/logs/web/*/access_log*.log"]
            codec => plain { format => "%{message}" }
        type => "web"
        sincedb_path => "/data/logs/sincedb/web"
    }
}
output {
    if [type] == 'service' {
        kafka {
            codec => plain { format => "%{message}" }
            bootstrap_servers => "139.219.*.*:9092"
        topic_id => "service"
        }
    }
    if [type] == 'web' {
        kafka {
                codec => plain { format => "%{message}" }
            bootstrap_servers => "139.219.*.*:9092"
        topic_id => "web"
        }
    }
}

service的日志开头是2017-12-01 12:01:01,所以pattern匹配时间,根据时间判断日志的起始点;web日志原封不动传过去,output到kafka集群,logstash-indexer从kafka获取日志后归入elasticsearch

logstash-indexer.conf示例

input {
        kafka {
                bootstrap_servers => "139.219.*.*:9092"
                topics => "service"
                type => "service"
        }
        kafka {
                bootstrap_servers =>"139.219.*.*:9092"
                topics => "web"
                type => "web"
        }
}
filter {
    if [type] != ['web'] {
        if "_grokparsefailure" in [tags] {
              drop { }
          }
        grok {
            match => {
                "message" => "%{TIMESTAMP_ISO8601:timestamp} %{GREEDYDATA}"
            }
        }
        date {
            match => ["timestamp","yyyy-MM-dd HH:mm:ss.SSS"]
            locale => "cn"
        }
    }
    if [type] == 'web' {
        if "_grokparsefailure" in [tags] {
              drop { }
          }
        grok {
                match => {
                    "message" => '%{IP} - - [%{HTTPDATE:time}] "%{WORD:methord} %{URIPATHPARAM:request} HTTP/%
{NUMBER:httpversion}" %{NUMBER:response} %{GREEDYDATA}'
                    }
            }
        date {
            match => ["time","dd/MMM/yyyy:HH:mm:ss +d+"]
            locale => "cn"
        }
    }
}
output {
        if [type] == 'service' {
                elasticsearch {
                        hosts => "172.16.1.1:9200"
                        index => "bbotte-service-%{+YYYY.MM.dd}"
                }
        }
        if [type] == 'web' {
                elasticsearch {
                        hosts => "172.16.1.1:9200"
                        index => "bbotte-web-%{+YYYY.MM.dd}"
                }
        }
}

最后就是gitlabci配置示例

# cat .gitlab-ci.yml
image: docker:latest

stages:
  - LogstashPubTest
  - LogstashPubProd

image-build-test:
  stage: LogstashPubTest
  script:
    - "current_date=`TZ='UTC-8' date +'%m%d%H%M'`"
    - "commit_sha=$CI_COMMIT_SHA"
    - "docker build -t bbotte.com:5000/logstash:$CI_COMMIT_REF_NAME-$current_date-${commit_sha:0:8} ."
    - "docker login -u admin -p 123456 bbotte.com:5000"
    - "docker push bbotte.com:5000/logstash:$CI_COMMIT_REF_NAME-$current_date-${commit_sha:0:8}"
  only:
    - test
image-build-master:
  stage: LogstashPubProd
  script:
    - "current_date=`TZ='UTC-8' date +'%m%d%H%M'`"
    - "commit_sha=$CI_COMMIT_SHA"
    - "docker build -t bbotte.com:5000/logstash:$CI_COMMIT_REF_NAME-$current_date-${commit_sha:0:8} ."
    - "docker login -u admin -p 123456 bbotte.com:5000"
    - "docker push bbotte.com:5000/logstash:$CI_COMMIT_REF_NAME-$current_date-${commit_sha:0:8}"
  only:
    - master

目录结构如下:

logstash$ ls -a
.   docker-entrypoint.sh  .git            logstash-5.5.0.tar.gz 
..  Dockerfile            .gitlab-ci.yml  logstash-shipper.conf

使用Filebeat和Logstash集中归档游戏日志

背景说明

由于游戏项目日志目前不够规范,不太容易根据字段结构化数据,开发又有实时查看生产和测试环境服务运行日志需求;如果写入ES通过Kibana查看,对于非分析类查看还是不太友好,当然也可以通过LogTrail插件

方案

  • Filebeat->Logstash->Files
  • Filebeat->Redis->Logstash->Files
  • Nxlog(Rsyslog、Logstash)->Kafka->Flink(Logstash->ES-Kibana)
  • 其他方案(可根据自己需求,选择合适的架构,作者选择了第二种方案)

注释: 由于Logstash无法处理输出到文件乱序的问题,可通过不同的文件使用不同的Logstash;或者直接写入ES(不存在乱序问题)、通过Flink输出到文件

部 署

系统环境

  • Debian8 x64
  • logstash-6.1.1
  • filebeat-6.1.1-amd64
  • Redis-3.2

Filebeat配置

/etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
  paths:
    - /home/data/log/*
    - /home/data/*.log
  scan_frequency: 20s
  encoding: utf-8
  tail_files: true
  harvester_buffer_size: 5485760
fields:
  ip_address: 192.168.2.2
  env: qa
output.redis:
  hosts: ["192.168.1.1:6379"]
  password: "geekwolf"
  key: "filebeat"
  db: 0
  timeout: 5
  max_retires: 3
  worker: 2
  bulk_max_size: 4096

Logstash配置

input {
 #Filebeat
 # beats {
 #   port => 5044
 # }
 #Redis
  redis {
    batch_count => 4096
    data_type => "list"
    key => "filebeat"
    host => "127.0.0.1"
    port => 5044
    password => "geekwolf"
    db => 0
    threads => 2
   }
}
filter {
  ruby {
      code => 'event.set("filename",event.get("source").split("/")[-1])'
  }
}
output {
  if [filename] =~ "nohup" {
    file {
        path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/%{filename}"
        flush_interval => 3
        codec => line { format => "%{message}"}
    }
  } else {
    file {
         path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/logs/%{filename}"
        flush_interval => 3
        codec => line { format => "%{message}"}
   }
 }
 #stdout { codec => rubydebug }
}

生产日志目录

.
├── prod
│   └── 2018-01-13
│       └── 2.2.2.2
│           ├── logs
│           │   ├── rpg_slow_db_.27075
│           └── nohup_service.log
└── qa
    ├── 2018-01-12
    │   ├── 192.168.3.1
    └── 2018-01-13
        ├── 192.168.3.2

Kafka、Logstash、Nginx日志收集入门

Nginx作为网站的第一入口,其日志记录了除用户相关的信息之外,还记录了整个网站系统的性能,对其进行性能排查是优化网站性能的一大关键。
Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。一般情景下,Logstash用来和ElasticSearch和Kibana搭配使用,简称ELK,本站http://www.wenzhihuai.com除了用作ELK,还配合了Kafka进行使用。
kafka是一个分布式的基于push-subscribe的消息系统,它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统,作为hadoop生态系统的一部分,被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎。

下面是本站日志系统的搭建

一、Nginx日志

为了配合ELK的使用,把日志变成json的格式,方便ElasticSearch对其检索。

    log_format main ''{"@timestamp":"$time_iso8601",''
      ''"host": "$server_addr",''
      ''"clientip": "$remote_addr",''
      ''"size": $body_bytes_sent,''
      ''"responsetime": $request_time,''
      ''"upstreamtime": "$upstream_response_time",''
      ''"upstreamhost": "$upstream_addr",''
      ''"http_host": "$host",''
      ''"url": "$uri",''
      ''"xff": "$http_x_forwarded_for",''
      ''"referer": "$http_referer",''
      ''"agent": "$http_user_agent",''
      ''"status": "$status"}'';
    access_log  logs/access.log  main;

然后执行nginx -t检验配置,nginx -s reload重启nginx即可。
注意:

  1. 这里的单引号用来标识不换行使用的,如果没有的话,Logstash会每一行都发送一次。

  2. 格式一定一定要规范。

二、Logstash

下载安装的具体请看Logstash官网,这里只讲讲如何配置,

输入

input {
    file {
        type => "nginx_access"
        path => "/usr/share/nginx/logs/access.log"
        codec => "json"
    }
}

过滤

filter,由于本站没有涉及到很复杂的手机,所以不填

输出

output {
stdout{
codec => rubydebug
}
kafka {
bootstrap_servers => “119.29.188.224:9092” #生产者
topic_id => “nginx-access-log” #设置写入kafka的topic
    # compression_type => "snappy"    #消息压缩模式,默认是none,可选gzip、snappy。
    codec => json       #一定要加上这段,不然传输错误,${message}
}
elasticsearch {
    hosts => "119.29.188.224:9200"    #Elasticsearch 地址,多个地址以逗号分隔。
    index => "logstash-%{type}-%{+YYYY.MM.dd}"    #索引命名方式,不支持大写字母(Logstash除外)
    document_type => "%{type}"    #文档类型
}
}

具体字段:

stdout:控制台输出,方便tail -f查看,可不要

kafka:输出到kafka,bootstrap_servers指的是kafka的地址和端口,topic_id是每条发布到kafka集群的消息属于的类别,其中codec一定要设置为json,要不然生产者出错,导致消费者是看到${message}。
elasticsearch:输出到elasticsearch,hosts指的是elasticsearch的地址和端口,index指的命名方式
然后启动Logstash:

nohup bin/logstash -f config/nginxlog2es.conf –path.data=tmp &

tail -f 查看nohup

未分类

三、kafka

目前的云服务器都用了NAT转换公网,如果不开启外网,kafka会默认使用内网私有地址访问,所以要开启外网访问
只需要在config/server.properties里加入:

advertised.host.name=119.29.188.224

改变默认端口:

advertised.host.port=9200

启动步骤:

(1)ZooKeeper启动

bin/zookeeper-server-start.sh config/zookeeper.properties

(2)启动Kafka

nohup bin/kafka-server-start.sh config/server.properties &

(3)创建一个topic

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

查看topic数量

bin/kafka-topics.sh –list –zookeeper localhost:2181

(4)生产者发送消息

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

(5)消费者接收消息

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

删除

删除kafka存储的日志,在kafka的config/server.properties的log.dirs=/tmp/kafka-logs查看

四、Spring Boot与Kafka

多模块的Spring Boot与Kafka

(1)在父pom.xml中添加:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-releasetrain</artifactId>
            <version>Fowler-SR2</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>1.5.9.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

(2)在消费者模块中添加:

    <parent>
        <artifactId>micro-service</artifactId>
        <groupId>micro-service</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>

配置文件:

# 本地运行端口
server.port=8082
# kafka地址和端口
spring.kafka.bootstrap-servers=119.29.188.224:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=myGroup
# 指定默认topic id
spring.kafka.template.default-topic=nginx-access-log
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
# 偏移量,最好使用latest,earily会从kafka运行起开始一直发送
spring.kafka.consumer.auto-offset-reset=latest
# 心跳检测
spring.kafka.consumer.heartbeat-interval=100

(5)接收消息

@Component
public class MsgConsumer {
    @KafkaListener(topics = {"nginx-access-log"})
    public void processMessage(String content) {
        System.out.println(content);
    }
}

(6)测试

运行之后点击网站http://www.wenzhihuai.com可看到:

未分类

完整代码可以到https://github.com/Zephery/micro-service查看

错误记录

(1)与Spring的包冲突:

Error starting ApplicationContext. To display the auto-configuration report re-run your application with ''debug'' enabled.
2018-01-05 11:10:47.947 ERROR 251848 --- [           main] o.s.boot.SpringApplication               : Application startup failed
org.springframework.context.ApplicationContextException: Unable to start embedded container; nested exception is org.springframework.boot.context.embedded.EmbeddedServletContainerException: Unable to start embedded Tomcat
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.onRefresh(EmbeddedWebApplicationContext.java:137) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:537) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]

解决办法:去掉父pom.xml文件里所有关于spring的包,只保留spring boot的即可

(2)消费者只接受到${message}消息

未分类

解决办法:

一定要在output的kafka中添加

codec => json

使用 logstash + kafka + elasticsearch 实现日志监控

在本文中,将介绍使用 logstash + kafka + elasticsearch 实现微服务日志监控与查询。

服务配置

添加 maven 依赖:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.0.0</version>
</dependency>

添加 log4j2 配置:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
  <Appenders>
    <Console name="Console" target="SYSTEM_OUT">
      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
    </Console>
    <Kafka name="Kafka" topic="mcloud-log">
      <PatternLayout pattern="%date %message"/>
      <Property name="bootstrap.servers">localhost:9092</Property>
    </Kafka>
  </Appenders>
  <Loggers>
    <Root level="debug">
      <AppenderRef ref="Console"/>
      <AppenderRef ref="Kafka"/>
    </Root>
    <Logger name="org.apache.kafka" level="INFO" />
  </Loggers>
</Configuration>

系统配置

Zookeeper-3.4.10

官网: http://zookeeper.apache.org/doc/current/zookeeperStarted.html#sc_InstallingSingleMode

添加配置

在 conf 目录下创建配置文件 zoo.cfg , 并在其中添加以下内容:

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

启动 ZooKeeper

windows:

bin/zkServer.bat start

Kafka_2.11-1.0.0

官网: http://kafka.apache.org/quickstart

修改日志存储位置

config/server.properties

log.dirs=D:/kafka-logs

启动 Kafka

windows:

bin/windows/kafka-server-start.bat config/server.properties

注:

如果在启动的时候出现以下错误:

错误: 找不到或无法加载主类

需要手动修改 bin/windows/kafka-run-class.bat ,找到以下的代码:

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*

将其中的 %CLASSPATH% 添上双引号 => “%CLASSPATH%” 。

Elasticsearch-6.1.1

官网: https://www.elastic.co/downloads/elasticsearch

安装 x-pack

bin/elasticsearch-plugin install x-pack

新增用户:

bin/users useradd mcloud-user

修改角色:

bin/users roles -a logstash_admin mcloud-log-user

注:

系统内置角色:

Known roles: [kibana_dashboard_only_user, watcher_admin, logstash_system, kibana_user, machine_learning_user, remote_monitoring_agent, machine_learning_admin, watcher_user, monitoring_user, reporting_user, kibana_system, logstash_admin, transport_client, superuser, ingest_admin]

启动服务

bin/elasticsearch.bat

Kibana-6.1.1

官网: https://www.elastic.co/downloads/kibana

安装 x-pack

bin/kibana-plugin.bat install x-pack

启动服务

bin/kibana.bat

Logstash-6.1.1

官网: https://www.elastic.co/downloads/logstash

创建配置文件

文档: https://www.elastic.co/guide/en/logstash/current/input-plugins.html

config/logstash.conf

input {     
    logstash-input-kafka {
        topics => ["mcloud-log"]
    } 
}
output {
  elasticsearch { 
    hosts => ["localhost:9200"] 
    user => "mcloud-user"
    password => 123456
  }
}

最终效果

相关服务启动完成后, 登陆 kibana 管理界面,可以看到以下的效果:

未分类

源码

源码: https://github.com/heyuxian/mcloud

使用 Filebeat 收集日志并提交到 logstash 中

安装 Filebeat

此处只介绍 Windows 下面的安装,至于其他系统, 请参考: https://link.jianshu.com/?t=https%3A%2F%2Fwww.elastic.co%2Fguide%2Fen%2Fbeats%2Ffilebeat%2Fcurrent%2Ffilebeat-installation.html

下载并解压后,有两种方式运行,一种是注册为 Windows 服务,另一种是直接通过命令行运行;下面分别介绍两种方式。

注册为 Windows 服务

前提:系统必须有 PowerShell,因为官方安装包中提供的脚本只能在 PowerShell 中运行,若是 win10 系统,可忽略,因为它已经自带了 PowerShell, 否则请下载 PowerShell 并安装。

  1. 下载安装包 点我下载.

  2. 解压到以下目录: C:Program Files 。

  3. 重命名 filebeat–windows 为 Filebeat 。

  4. 以 管理员 身份运行 PowerShell 。

  5. 在 PowerShell 中运行以下命令:

cd 'C:Program FilesFilebeat'
C:Program FilesFilebeat> .install-service-filebeat.ps1

注:

如果此处提示你没有权限,请运行以下的命令注册 Filebeat 服务 :

PowerShell.exe -ExecutionPolicy UnRestricted -File .install-service-filebeat.ps1

到这,已经将 Filebeat 成功注册为系统服务,当下一次开机时它会自动启动,当然你也可以手动通过服务控制面板启动它。

通过命令行运行 Filebeat

通过命令行运行 Filebeat 非常简单,只需将 Filebeat 文件解压到某个目录后,通过以下命令运行:

filebeat -e -c filebeat.yml

配置 Filebeat

日志输入配置

Filebeat 使用了安装目录下的 filebeat.yml 文件进行相关配置。此处我们主要会用到以下的配置:

filebeat.prospectors:
- type: log
# 此处需特别注意,官方默认配置为 false,需要修改为 true
  enabled: true
  paths:
  # 此处配置的是需要收集的日志所在的位置,可使用通配符进行配置
    - D:/logs/*.log

日志输出配置

因为我们使用的是 logstash 收集日志,所以得注释掉默认的 elasticsearch 配置,并取消 logstash 的注释,最终的效果为:

#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

此处仅介绍了最基础的配置,如需查看更多高级配置,请查看:官方文档

关于 Filebeat 的配置已经介绍完毕,下面我介绍 log4j2 的配置。

配置 Log4j2

因为我们使用的是 Filebeat 进行日志收集,所以我们只需要简单的将日志输出到本地文件中即可,这里我将使用 RollingFile 进行相关配置:

log4j-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
  <Properties>
    <Property name="pattern" value="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
  </Properties>
  <Appenders>
    <Console name="Console" target="SYSTEM_OUT">
      <PatternLayout pattern="${pattern}"/>
    </Console>
    <RollingFile name="RollingFile" fileName="D:/logs/app.log"
      filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
      <PatternLayout pattern="${pattern}"/>
      <Policies>
        <TimeBasedTriggeringPolicy />
        <SizeBasedTriggeringPolicy size="250 MB"/>
      </Policies>
    </RollingFile>
    <Async name="AsyncRollingFile">
      <AppenderRef ref="RollingFile"/>
    </Async>
  </Appenders>
  <Loggers>
    <Root level="DEBUG">
      <AppenderRef ref="Console"/>
      <AppenderRef ref="AsyncRollingFile"/>
    </Root>
  </Loggers>
</Configuration>

配置 logstash

这里我们只需要在加入以下的配置即可:

input {
  beats {
    port => 5044
  }
}

此处的端口需要和 Filebeat 中配置的端口一致。

好了,所有的配置都已经完成,这里就不再重复 kibana 和 elasticsearch 的配置了,如有需要,请查看: https://www.jianshu.com/p/78c5159aace8

运行效果

我们启动其它服务并登陆 kibana 后,就可以看到以下的结果了:

未分类

查看源码

关于 Filebeat 的介绍就到此结束了,关于 logstash 的更多高级功能将在后续文章中一一介绍。

查看源码: https://link.jianshu.com/?t=https%3A%2F%2Fgithub.com%2Fheyuxian%2Fmcloud