ansible任务的异步执行

ansible方便在于能批量下发,并返回结果和呈现。简单、高效。
但有的任务执行起来却不那么直接,可能会花比较长的时间,甚至可能会比ssh的超时时间还要长。这种情况任务是不是没法执行了?
ansible考虑到了这种情况,官方文档介绍了这个问题的解决方法,就是让下发的任务执行的连接变为异步:任务下发之后,长连接不再保持,而是每隔一段时间轮询结果,直到任务结束。

这是官网相关的介绍: http://docs.ansible.com/ansible/latest/playbooks_async.html

他们在playbook的任务中加入两个参数:async和poll。

  • async参数值代表了这个任务执行时间的上限值。即任务执行所用时间如果超出这个时间,则认为任务失败。此参数若未设置,则为同步执行。
  • poll参数值代表了任务异步执行时轮询的时间间隔。

官方给出例子:

  ----
    hosts: all
    remote_user: root
    tasks:
      - name: simulate long running op (15 sec), wait for up to 45 sec, poll every 5 sec
        command: /bin/sleep 15
        async: 45
        poll: 5

这时候已经不怕任务超时了。可以执行一个45s的任务,当然也可以根据需要自己设置。另外,如果poll为0,就相当于一个不关心结果的任务。

如果还想要更方便地看轮询结果,ansible还提供了这个模块async_status。

  ---
    # Requires ansible 1.8+
    - name: 'YUM - fire and forget task'
      yum: name=docker-io state=installed
      async: 1000
      poll: 0
      register: yum_sleeper

    - name: 'YUM - check on fire and forget task'
      async_status: jid={{ yum_sleeper.ansible_job_id }}
      register: job_result
      until: job_result.finished
      retries: 30

第一个job执行异步任务,并且注册了一个名字叫yum_sleeper,用于提供给第二个job作为轮询对象,并且poll设为0,它自己不再轮询。

第二个job使用async_status模块,进行轮询并返回轮询结果。准备检查30次。结果如下:

PLAY [all] *********************************************************************

TASK [setup] *******************************************************************
ok: [cloudlab001]

TASK [YUM - fire and forget task] **********************************************
ok: [cloudlab001]

TASK [YUM - check on fire and forget task] *************************************
FAILED - RETRYING: TASK: YUM - check on fire and forget task (29 retries left).
FAILED - RETRYING: TASK: YUM - check on fire and forget task (28 retries left).
FAILED - RETRYING: TASK: YUM - check on fire and forget task (27 retries left).
FAILED - RETRYING: TASK: YUM - check on fire and forget task (26 retries left).
FAILED - RETRYING: TASK: YUM - check on fire and forget task (25 retries left).
FAILED - RETRYING: TASK: YUM - check on fire and forget task (24 retries left).
changed: [cloudlab001]

PLAY RECAP *********************************************************************
cloudlab001                : ok=3    changed=1    unreachable=0    failed=0

ansible fetch 批量下载服务器文件

今天使用 ansible 进行批量巡检操作。

思路是写一个 Playbooks,将巡检脚本上传到所有服务器 /tmp 目录下,然后执行,并取回输出的文件。输出的文件路径为:/tmp/log/ip.txt 。ip 为本机 ip 。

Playbooks 内容如下:

---
- hosts:  test
  remote_user: toptea

  tasks:
  - name: transfer file to server
    copy: src=/root/xunjian.sh dest=/tmp/xunjian.sh mode=755

  - name: zhixing 
    become: yes
    become_method:  su
    shell:  /bin/bash -x /tmp/pswd.sh 

上传文件使用 copy 模块,执行文件用 shell 模块都没问题。

取回文件出了问题,每台服务器的文件名都是不一样的。

取回文件使用 fetch 模块。测试了如下语句,行不通:

ansible all -m fetch -a "src=/tmp/log/* dest=/tmp/"

疯子哥让我去看官方文档。

http://docs.ansible.com/ansible/latest/fetch_module.html#examples
 fetch:
      src: /tmp/{{ inventory_hostname }}.txt
      dest: /tmp/ss-{{ inventory_hostname }}
      flat: yes

使用这个就可以从所有服务器上下载文件。解释一下:

//fetch 是调用这个模块
 fetch:
 //src 是远程服务器的路径,这里的 inventory_hostname 就是填在 /etc/ansible/hosts 文件里面的内容。比如说 hosts 文件你填的是 192.168.1.3
// 那这里的 {{inventory_hostname}}.txt 就是 192.168.1.3.txt
      src: /tmp/{{ inventory_hostname }}.txt
      dest: /tmp/ss-{{ inventory_hostname }}
      flat: yes

发现问题了吗?对,这个脚本要求你的文件名必须包含 inventory_hostname ,

如果没有怎么办呢?使用下面的脚本:

  tasks:
    - name: fucking
      find:
        paths: /tmp/log/
        patterns: "*"
        recurse: no
      register: file_2_fetch

    - name: fuck your bitch
      fetch:
        src: "{{ item.path }}"
        dest: /tmp/
        flat: yes
      with_items: "{{ file_2_fetch.files }}"

解释一下:

首先调用 find,paths 即你存放文件的路径。 patterns 即你要跟的关键字,这里是 *,即通配符,匹配所有文件。你可以写为 *.txt ,匹配所有 txt 文件。
第二行调用 fetch ,ansible 的 Fetches a file from remote nodes ,
src 即上面的find 查到出来的结果。

执行结果如下:

[root@master ~]# ansible-playbook main.yaml 

PLAY [test] ************************************************************************************

TASK [Gathering Facts] *************************************************************************
ok: [192.168.153.22]

TASK [fucking] *********************************************************************************
ok: [192.168.153.22]

TASK [fuck your bitch] *************************************************************************
ok: [192.168.153.22] => (item={u'uid': 0, u'woth': False, u'mtime': 1516180038.2560008, u'inode': 34964981, u'isgid': False, u'size': 0, u'isuid': False, u'isreg': True, u'gid': 0, u'ischr': False, u'wusr': True, u'xoth': False, u'islnk': False, u'nlink': 1, u'issock': False, u'rgrp': True, u'path': u'/tmp/log/192.168.153.22.txt', u'xusr': False, u'atime': 1516181632.1700034, u'isdir': False, u'ctime': 1516181291.6150029, u'isblk': False, u'wgrp': False, u'xgrp': False, u'dev': 64768, u'roth': True, u'isfifo': False, u'mode': u'0644', u'rusr': True})
ok: [192.168.153.22] => (item={u'uid': 0, u'woth': False, u'mtime': 1516182493.8110049, u'inode': 1762530, u'isgid': False, u'size': 0, u'isuid': False, u'isreg': True, u'gid': 0, u'ischr': False, u'wusr': True, u'xoth': False, u'islnk': False, u'nlink': 1, u'issock': False, u'rgrp': True, u'path': u'/tmp/log/1.txt', u'xusr': False, u'atime': 1516182504.3540049, u'isdir': False, u'ctime': 1516182493.8110049, u'isblk': False, u'wgrp': False, u'xgrp': False, u'dev': 64768, u'roth': True, u'isfifo': False, u'mode': u'0644', u'rusr': True})
changed: [192.168.153.22] => (item={u'uid': 0, u'woth': False, u'mtime': 1516182519.4070048, u'inode': 1762531, u'isgid': False, u'size': 0, u'isuid': False, u'isreg': True, u'gid': 0, u'ischr': False, u'wusr': True, u'xoth': False, u'islnk': False, u'nlink': 1, u'issock': False, u'rgrp': True, u'path': u'/tmp/log/2.pdf', u'xusr': False, u'atime': 1516182519.4070048, u'isdir': False, u'ctime': 1516182519.4070048, u'isblk': False, u'wgrp': False, u'xgrp': False, u'dev': 64768, u'roth': True, u'isfifo': False, u'mode': u'0644', u'rusr': True})

PLAY RECAP *************************************************************************************
192.168.153.22             : ok=3    changed=1    unreachable=0    failed=0   

[root@master ~]# ls /tmp/
192.168.153.22.txt  1.txt  2.pdf

docker-compose中启动镜像失败的问题

正常的docker run启动

java:8u111-jdk是java官方镜像,如下命令可以成功启动一个该镜像的容器:

docker run --name test001 -idt java:8u111-jdk

以上命令创建的容器,可用docker exec -it test001 /bin/bash进入容器,执行我们所需的操作;

docker-compose启动失败

这里写个最简单的docker-compose.yml,然后用docker-compse,内容如下:

master:
  image: java:8u111-jdk

在此文件所在目录下执行docker-compose up -d启动容器,再执行docker ps -a查看容器状态,信息如下所示:

root@rabbitmq:/usr/local/work/test# docker-compose up -d
Creating test_master_1 ... done
root@rabbitmq:/usr/local/work/test# docker ps -a
CONTAINER ID        IMAGE               COMMAND             CREATED              STATUS                          PORTS               NAMES
bb433fe9984d        java:8u111-jdk      "/bin/bash"         About a minute ago   Exited (0) About a minute ago                       test_master_1

信息显示我们启动的容器状态为Exited (0) About a minute ago,也就是说虽然创建了容器,但是该容器并未正常运行;

控制终端缺失

启动失败是因为缺失了控制终端的配置,这里有两种方式修复;

使用tty参数(推荐使用)

修改docker-compose.yml,增加一个配置tty:true,如下:

master:
  image: java:8u111-jdk
  tty: true

先执行docker-compose down将之前的容器删除,再执行docker-compose up -d启动,可以发现启动成功,并且可以成功进入容器进行操作:

root@rabbitmq:/usr/local/work/test# docker-compose up -d
Creating test_master_1 ... done
root@rabbitmq:/usr/local/work/test# docker ps
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES
f51debaa26ec        java:8u111-jdk      "/bin/bash"         2 seconds ago       Up 2 seconds                            test_master_1
root@rabbitmq:/usr/local/work/test# docker exec -it test_master_1 /bin/bash
root@f51debaa26ec:/# java -version
openjdk version "1.8.0_111"
OpenJDK Runtime Environment (build 1.8.0_111-8u111-b14-2~bpo8+1-b14)
OpenJDK 64-Bit Server VM (build 25.111-b14, mixed mode)

使用exec重新创建容器(不推荐)

这种方式并不推荐,因为这样做虽然可以启动容器,但是只能重新创建一个容器,具体方法如下:

  1. 使用docker-compose up -d命令启动后,由于没有tty:true的配置,容器就退出了;

  2. 这时候执行命令docker-compose run master /bin/bash,会创建一个容器,并且进入这个容器;

  3. 在当前电脑再打开一个控制台,执行docker ps命令,发现新建了一个容器,状态正常;

ubuntu16.04安装最新版docker、docker-compose、docker-machine

安装前说明:

本文将介绍在ubuntu16.04系统下安装和升级docker、docker-compose、docker-machine。

docker:有两个版本:docker-ce(社区版)和docker-ee(企业版)。

    笔者这里介绍安装或升级的是最新版docker-ce(社区版)。

    参考官网地址:https://docs.docker.com/engine/installation/linux/docker-ce/ubuntu/#os-requirements

docker-compse:可运行和管理多个docker容器。

docker-machine:docker官方提供的docker管理工具。可管理多个docker主机,可搭建swarm集群。

一、docker安装

1、卸载旧版本docker

全新安装时,无需执行该步骤

$ sudo apt-get remove docker docker-engine docker.io

2、更新系统软件

$ sudo apt-get update

3、安装依赖包

$ sudo apt-get install 
    apt-transport-https 
    ca-certificates 
    curl 
    software-properties-common

4、添加官方密钥

执行该命令时,如遇到长时间没有响应说明网络连接不到docker网站,需要使用代-理进行。

$ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -

显示OK,表示添加成功.

5、添加仓库

$ sudo add-apt-repository 
   "deb [arch=amd64] https://download.docker.com/linux/ubuntu 
   $(lsb_release -cs) 
   stable"

6、再次更新软件

经实践,这一步不能够省略,我们需要再次把软件更新到最新,否则下一步有可能会报错。

$ sudo apt-get update

7、安装docker

如果想指定安装某一版本,可使用 sudo apt-get install docker-ce= 命令,把替换为具体版本即可。

以下命令没有指定版本,默认就会安装最新版

$ sudo apt-get install docker-ce

8、查看docker版本

$ docker -v

显示“Docker version 17.09.0-ce, build afdb6d4”字样,表示安装成功。

二、docker-compose安装

1、下载docker-compose

$ sudo curl -L https://github.com/docker/compose/releases/download/1.17.0/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose

2、授权

$ sudo chmod +x /usr/local/bin/docker-compose

3、查看版本信息

$ docker-compose --version

显示出版本信息,即安装成功。

三、docker-machine安装

说明:docker-machine的使用是要基于virtualBox的。如果没有安装安装过,请先安装virtualBox。

1、安装virtualBox

登录virtualBox官网:https://www.virtualbox.org/wiki/Linux_Downloads

找到”Ubuntu 16.04 (“Xenial”) i386 | AMD64″字样,点击“AMD64”进行下载。

下载后,执行以下命令进行安装:

$ sudo dpkg -i virtualbox-5.2_5.2.0-118431_Ubuntu_xenial_amd64.deb

2、下载并安装docker-machine

$ curl -L https://github.com/docker/machine/releases/download/v0.13.0/docker-machine-`uname -s`-`uname -m` >/tmp/docker-machine &&
chmod +x /tmp/docker-machine &&
sudo cp /tmp/docker-machine /usr/local/bin/docker-machine

3、查看版本信息

$ docker-machine version

显示出版本信息,即安装成功。

CICD之logstash服务的Dockerfile使用Gitlab Runner打docker包

gitlab提交代码后,经gitlab Runner打docker包,推送到docker仓库,然后kubernetes选择版本更新

Dockerfile

FROM openjdk:8-jre-alpine

# ensure logstash user exists
RUN addgroup -S logstash && adduser -S -G logstash logstash

# install plugin dependencies
RUN apk add --no-cache 
# env: can't execute 'bash': No such file or directory
        bash 
        libc6-compat 
        libzmq

# grab su-exec for easy step-down from root
RUN apk add --no-cache 'su-exec>=0.2'

# https://www.elastic.co/guide/en/logstash/5.0/installing-logstash.html#_apt
# https://artifacts.elastic.co/GPG-KEY-elasticsearch
ENV LOGSTASH_PATH /usr/share/logstash/bin
ENV PATH $LOGSTASH_PATH:$PATH

# LOGSTASH_TARBALL="https://artifacts.elastic.co/downloads/logstash/logstash-5.5.0.tar.gz"

COPY logstash-5.5.0.tar.gz /logstash.tar.gz
RUN set -ex; 
    apk add --no-cache --virtual .fetch-deps 
        ca-certificates 
        gnupg 
        openssl 
        tar ; 
    dir="$(dirname "$LOGSTASH_PATH")"; 
    mkdir -p "$dir"; 
    tar -xf /logstash.tar.gz --strip-components=1 -C "$dir"; 
    rm logstash.tar.gz; 
    apk del .fetch-deps; 
    export LS_SETTINGS_DIR="$dir/config"; 
# if the "log4j2.properties" file exists (logstash 5.x), let's empty it out so we get the default: "logging only errors to the console"
    if [ -f "$LS_SETTINGS_DIR/log4j2.properties" ]; then 
        cp "$LS_SETTINGS_DIR/log4j2.properties" "$LS_SETTINGS_DIR/log4j2.properties.dist"; 
        truncate -s 0 "$LS_SETTINGS_DIR/log4j2.properties"; 
    fi; 
# set up some file permissions
    for userDir in 
        "$dir/config" 
        "$dir/data" 
    ; do 
        if [ -d "$userDir" ]; then 
            chown -R logstash:logstash "$userDir"; 
        fi; 
    done; 
    logstash --version

COPY docker-entrypoint.sh /
RUN chmod +x /docker-entrypoint.sh
COPY logstash-shipper.conf /
RUN mkdir -p /data/logs/sincedb
RUN chown logstash.logstash -R /data/logs/sincedb
WORKDIR /
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["-f", "/logstash-shipper.conf"]

docker-entrypoint.sh

#!/bin/bash
set -e
mkdir -p /data/logs/sincedb
chown logstash.logstash -R /data/logs/sincedb

# first arg is `-f` or `--some-option`
if [ "${1#-}" != "$1" ]; then
    set -- logstash "$@"
fi

# Run as user "logstash" if the command is "logstash"
# allow the container to be started with `--user`
if [ "$1" = 'logstash' -a "$(id -u)" = '0' ]; then
    set -- su-exec logstash "$@"
fi

exec "$@"

logstash-5.5.0.tar.gz 从官方下载 https://www.elastic.co/cn/downloads/logstash

logstash-shipper.conf样例

input {
    file {
        path => [ "/data/logs/service/*/*.log"]
        type => "service"
        sincedb_path => "/data/logs/sincedb/service"
        codec => multiline {
            pattern => "^dddd-dd-dd dd:dd:dd.ddd .+"
            negate => true
            what => "previous"
            max_lines => 30
        }       
    }
    file {
        path => [ "/data/logs/web/*/access_log*.log"]
            codec => plain { format => "%{message}" }
        type => "web"
        sincedb_path => "/data/logs/sincedb/web"
    }
}
output {
    if [type] == 'service' {
        kafka {
            codec => plain { format => "%{message}" }
            bootstrap_servers => "139.219.*.*:9092"
        topic_id => "service"
        }
    }
    if [type] == 'web' {
        kafka {
                codec => plain { format => "%{message}" }
            bootstrap_servers => "139.219.*.*:9092"
        topic_id => "web"
        }
    }
}

service的日志开头是2017-12-01 12:01:01,所以pattern匹配时间,根据时间判断日志的起始点;web日志原封不动传过去,output到kafka集群,logstash-indexer从kafka获取日志后归入elasticsearch

logstash-indexer.conf示例

input {
        kafka {
                bootstrap_servers => "139.219.*.*:9092"
                topics => "service"
                type => "service"
        }
        kafka {
                bootstrap_servers =>"139.219.*.*:9092"
                topics => "web"
                type => "web"
        }
}
filter {
    if [type] != ['web'] {
        if "_grokparsefailure" in [tags] {
              drop { }
          }
        grok {
            match => {
                "message" => "%{TIMESTAMP_ISO8601:timestamp} %{GREEDYDATA}"
            }
        }
        date {
            match => ["timestamp","yyyy-MM-dd HH:mm:ss.SSS"]
            locale => "cn"
        }
    }
    if [type] == 'web' {
        if "_grokparsefailure" in [tags] {
              drop { }
          }
        grok {
                match => {
                    "message" => '%{IP} - - [%{HTTPDATE:time}] "%{WORD:methord} %{URIPATHPARAM:request} HTTP/%
{NUMBER:httpversion}" %{NUMBER:response} %{GREEDYDATA}'
                    }
            }
        date {
            match => ["time","dd/MMM/yyyy:HH:mm:ss +d+"]
            locale => "cn"
        }
    }
}
output {
        if [type] == 'service' {
                elasticsearch {
                        hosts => "172.16.1.1:9200"
                        index => "bbotte-service-%{+YYYY.MM.dd}"
                }
        }
        if [type] == 'web' {
                elasticsearch {
                        hosts => "172.16.1.1:9200"
                        index => "bbotte-web-%{+YYYY.MM.dd}"
                }
        }
}

最后就是gitlabci配置示例

# cat .gitlab-ci.yml
image: docker:latest

stages:
  - LogstashPubTest
  - LogstashPubProd

image-build-test:
  stage: LogstashPubTest
  script:
    - "current_date=`TZ='UTC-8' date +'%m%d%H%M'`"
    - "commit_sha=$CI_COMMIT_SHA"
    - "docker build -t bbotte.com:5000/logstash:$CI_COMMIT_REF_NAME-$current_date-${commit_sha:0:8} ."
    - "docker login -u admin -p 123456 bbotte.com:5000"
    - "docker push bbotte.com:5000/logstash:$CI_COMMIT_REF_NAME-$current_date-${commit_sha:0:8}"
  only:
    - test
image-build-master:
  stage: LogstashPubProd
  script:
    - "current_date=`TZ='UTC-8' date +'%m%d%H%M'`"
    - "commit_sha=$CI_COMMIT_SHA"
    - "docker build -t bbotte.com:5000/logstash:$CI_COMMIT_REF_NAME-$current_date-${commit_sha:0:8} ."
    - "docker login -u admin -p 123456 bbotte.com:5000"
    - "docker push bbotte.com:5000/logstash:$CI_COMMIT_REF_NAME-$current_date-${commit_sha:0:8}"
  only:
    - master

目录结构如下:

logstash$ ls -a
.   docker-entrypoint.sh  .git            logstash-5.5.0.tar.gz 
..  Dockerfile            .gitlab-ci.yml  logstash-shipper.conf

使用Filebeat和Logstash集中归档游戏日志

背景说明

由于游戏项目日志目前不够规范,不太容易根据字段结构化数据,开发又有实时查看生产和测试环境服务运行日志需求;如果写入ES通过Kibana查看,对于非分析类查看还是不太友好,当然也可以通过LogTrail插件

方案

  • Filebeat->Logstash->Files
  • Filebeat->Redis->Logstash->Files
  • Nxlog(Rsyslog、Logstash)->Kafka->Flink(Logstash->ES-Kibana)
  • 其他方案(可根据自己需求,选择合适的架构,作者选择了第二种方案)

注释: 由于Logstash无法处理输出到文件乱序的问题,可通过不同的文件使用不同的Logstash;或者直接写入ES(不存在乱序问题)、通过Flink输出到文件

部 署

系统环境

  • Debian8 x64
  • logstash-6.1.1
  • filebeat-6.1.1-amd64
  • Redis-3.2

Filebeat配置

/etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
  paths:
    - /home/data/log/*
    - /home/data/*.log
  scan_frequency: 20s
  encoding: utf-8
  tail_files: true
  harvester_buffer_size: 5485760
fields:
  ip_address: 192.168.2.2
  env: qa
output.redis:
  hosts: ["192.168.1.1:6379"]
  password: "geekwolf"
  key: "filebeat"
  db: 0
  timeout: 5
  max_retires: 3
  worker: 2
  bulk_max_size: 4096

Logstash配置

input {
 #Filebeat
 # beats {
 #   port => 5044
 # }
 #Redis
  redis {
    batch_count => 4096
    data_type => "list"
    key => "filebeat"
    host => "127.0.0.1"
    port => 5044
    password => "geekwolf"
    db => 0
    threads => 2
   }
}
filter {
  ruby {
      code => 'event.set("filename",event.get("source").split("/")[-1])'
  }
}
output {
  if [filename] =~ "nohup" {
    file {
        path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/%{filename}"
        flush_interval => 3
        codec => line { format => "%{message}"}
    }
  } else {
    file {
         path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/logs/%{filename}"
        flush_interval => 3
        codec => line { format => "%{message}"}
   }
 }
 #stdout { codec => rubydebug }
}

生产日志目录

.
├── prod
│   └── 2018-01-13
│       └── 2.2.2.2
│           ├── logs
│           │   ├── rpg_slow_db_.27075
│           └── nohup_service.log
└── qa
    ├── 2018-01-12
    │   ├── 192.168.3.1
    └── 2018-01-13
        ├── 192.168.3.2

Kafka、Logstash、Nginx日志收集入门

Nginx作为网站的第一入口,其日志记录了除用户相关的信息之外,还记录了整个网站系统的性能,对其进行性能排查是优化网站性能的一大关键。
Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。一般情景下,Logstash用来和ElasticSearch和Kibana搭配使用,简称ELK,本站http://www.wenzhihuai.com除了用作ELK,还配合了Kafka进行使用。
kafka是一个分布式的基于push-subscribe的消息系统,它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统,作为hadoop生态系统的一部分,被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎。

下面是本站日志系统的搭建

一、Nginx日志

为了配合ELK的使用,把日志变成json的格式,方便ElasticSearch对其检索。

    log_format main ''{"@timestamp":"$time_iso8601",''
      ''"host": "$server_addr",''
      ''"clientip": "$remote_addr",''
      ''"size": $body_bytes_sent,''
      ''"responsetime": $request_time,''
      ''"upstreamtime": "$upstream_response_time",''
      ''"upstreamhost": "$upstream_addr",''
      ''"http_host": "$host",''
      ''"url": "$uri",''
      ''"xff": "$http_x_forwarded_for",''
      ''"referer": "$http_referer",''
      ''"agent": "$http_user_agent",''
      ''"status": "$status"}'';
    access_log  logs/access.log  main;

然后执行nginx -t检验配置,nginx -s reload重启nginx即可。
注意:

  1. 这里的单引号用来标识不换行使用的,如果没有的话,Logstash会每一行都发送一次。

  2. 格式一定一定要规范。

二、Logstash

下载安装的具体请看Logstash官网,这里只讲讲如何配置,

输入

input {
    file {
        type => "nginx_access"
        path => "/usr/share/nginx/logs/access.log"
        codec => "json"
    }
}

过滤

filter,由于本站没有涉及到很复杂的手机,所以不填

输出

output {
stdout{
codec => rubydebug
}
kafka {
bootstrap_servers => “119.29.188.224:9092” #生产者
topic_id => “nginx-access-log” #设置写入kafka的topic
    # compression_type => "snappy"    #消息压缩模式,默认是none,可选gzip、snappy。
    codec => json       #一定要加上这段,不然传输错误,${message}
}
elasticsearch {
    hosts => "119.29.188.224:9200"    #Elasticsearch 地址,多个地址以逗号分隔。
    index => "logstash-%{type}-%{+YYYY.MM.dd}"    #索引命名方式,不支持大写字母(Logstash除外)
    document_type => "%{type}"    #文档类型
}
}

具体字段:

stdout:控制台输出,方便tail -f查看,可不要

kafka:输出到kafka,bootstrap_servers指的是kafka的地址和端口,topic_id是每条发布到kafka集群的消息属于的类别,其中codec一定要设置为json,要不然生产者出错,导致消费者是看到${message}。
elasticsearch:输出到elasticsearch,hosts指的是elasticsearch的地址和端口,index指的命名方式
然后启动Logstash:

nohup bin/logstash -f config/nginxlog2es.conf –path.data=tmp &

tail -f 查看nohup

未分类

三、kafka

目前的云服务器都用了NAT转换公网,如果不开启外网,kafka会默认使用内网私有地址访问,所以要开启外网访问
只需要在config/server.properties里加入:

advertised.host.name=119.29.188.224

改变默认端口:

advertised.host.port=9200

启动步骤:

(1)ZooKeeper启动

bin/zookeeper-server-start.sh config/zookeeper.properties

(2)启动Kafka

nohup bin/kafka-server-start.sh config/server.properties &

(3)创建一个topic

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

查看topic数量

bin/kafka-topics.sh –list –zookeeper localhost:2181

(4)生产者发送消息

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

(5)消费者接收消息

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

删除

删除kafka存储的日志,在kafka的config/server.properties的log.dirs=/tmp/kafka-logs查看

四、Spring Boot与Kafka

多模块的Spring Boot与Kafka

(1)在父pom.xml中添加:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-releasetrain</artifactId>
            <version>Fowler-SR2</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>1.5.9.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

(2)在消费者模块中添加:

    <parent>
        <artifactId>micro-service</artifactId>
        <groupId>micro-service</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>

配置文件:

# 本地运行端口
server.port=8082
# kafka地址和端口
spring.kafka.bootstrap-servers=119.29.188.224:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=myGroup
# 指定默认topic id
spring.kafka.template.default-topic=nginx-access-log
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
# 偏移量,最好使用latest,earily会从kafka运行起开始一直发送
spring.kafka.consumer.auto-offset-reset=latest
# 心跳检测
spring.kafka.consumer.heartbeat-interval=100

(5)接收消息

@Component
public class MsgConsumer {
    @KafkaListener(topics = {"nginx-access-log"})
    public void processMessage(String content) {
        System.out.println(content);
    }
}

(6)测试

运行之后点击网站http://www.wenzhihuai.com可看到:

未分类

完整代码可以到https://github.com/Zephery/micro-service查看

错误记录

(1)与Spring的包冲突:

Error starting ApplicationContext. To display the auto-configuration report re-run your application with ''debug'' enabled.
2018-01-05 11:10:47.947 ERROR 251848 --- [           main] o.s.boot.SpringApplication               : Application startup failed
org.springframework.context.ApplicationContextException: Unable to start embedded container; nested exception is org.springframework.boot.context.embedded.EmbeddedServletContainerException: Unable to start embedded Tomcat
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.onRefresh(EmbeddedWebApplicationContext.java:137) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:537) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]

解决办法:去掉父pom.xml文件里所有关于spring的包,只保留spring boot的即可

(2)消费者只接受到${message}消息

未分类

解决办法:

一定要在output的kafka中添加

codec => json

Linux下grep显示前后几行信息

摘要: 标准unix/linux下的grep通过下面參数控制上下文 grep -C 5 foo file 显示file文件里匹配foo字串那行以及上下5行grep -B 5 foo file 显示foo及前5行grep -A 5 foo file 显示foo及后5行 查看grep版本号的方法是grep -V 假设想升级,升级的方法:最新的源代码(google或者百度搜索主页),编译安装到某个地方,比方 /home/aaa/bin/ 那么以后用的时候就用 /home/aaa/bin/grep ,或者把 /home/aaa/bin 加到PATH环境变量就可以。

标准unix/linux下的grep通过下面參数控制上下文

grep -C 5 foo file 显示file文件里匹配foo字串那行以及上下5行

grep -B 5 foo file 显示foo及前5行

grep -A 5 foo file 显示foo及后5行

查看grep版本号的方法是

grep -V

假设想升级,升级的方法:最新的源代码(google或者百度搜索主页),编译安装到某个地方,比方 /home/aaa/bin/ 那么以后用的时候就用 /home/aaa/bin/grep ,或者把 /home/aaa/bin 加到PATH环境变量就可以。 假设你把最新编译好的grep覆盖到你如今grep所在文件夹,则升级自然就完成了。

linux统计文件夹中文件数目

第一种方法:

ls -l|grep "^-"|wc -l

未分类

grep ^- 这里将长列表输出信息过滤一部分,只保留一般文件,如果只保留目录就是 ^d

wc -l 统计输出信息的行数,因为已经过滤得只剩一般文件了,所以统计结果就是一般文件信息的行数,又由于一行信息对应一个文件,所以也就是文件的个数。

第二种方法:

find ./ -type f|wc -l

未分类

需要说明的是第二种方法会比第一种方法快很多,尤其是也统计子目录时。

redis counter demo – redis并发计数器

一、说明

利用redis操作的原子性,实现java 多线程并发的情况下实现计数器。

我本机测试多个线程操作之后,结果会出现一定的延迟,但是最终数字是ok的

应该是redis内部做了一个类似于队列的功能。

需要注意的是,得使用redis连接的线程池,不然会出现异常

这里有一个:JedisUtil 下面用到了

二、 代码实现

2.1 redis操作类

package com.hisen.thread.count_click_by_redis;
import com.hisen.utils.JedisUtil;
import redis.clients.jedis.JedisPool;
/**
 * @author hisenyuan
 * @description 操作redis的线程类
 */
public class ClickRedis {
  /**
   * 必须使用线程池,而且线程池要大于并发数,否则会出现redis超时
   */
  private static JedisPool jedis = JedisUtil.getPool();
  public static void click() {
    jedis.getResource().incrBy("hisen", 1);
  }
  public static int getCount() {
    return Integer.parseInt(jedis.getResource().get("hisen"));
  }
  public static void declare() {
    jedis.getResource().del("hisen");
    jedis.close();
  }
}

2.2 线程类,模拟点击

package com.hisen.thread.count_click_by_redis;
/**
 * @author hisenyuan
 * @description 执行点击的线程类
 */
public class CountClickByRedisThread extends Thread{
  private int id;
  public CountClickByRedisThread(int id) {
    this.id = id;
  }
  @Override
  public void run() {
    super.run();
    ClickRedis.click();
    int count = ClickRedis.getCount();
    System.out.println("task:" + id + "t 执行完毕t线程编号:" + this.getId() + "t当前值:" + count);
  }
}

2.3 主线程 – 启动类

package com.hisen.thread.count_click_by_redis;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
  public static void main(String[] args) throws InterruptedException {
    /**
     * 5 - corePoolSize:核心池的大小
     * 10 - maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程
     * 200 - keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。
     * unit - unit:参数keepAliveTime的时间单位,有7种取值
     * workQueue:一个阻塞队列,用来存储等待执行的任务
     */
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
        50,
        100,
        200,
        TimeUnit.MICROSECONDS,
        new ArrayBlockingQueue<Runnable>(50));
    // 开启50个线程
    for (int i = 0; i < 50; i++) {
      executor.execute(new CountClickByRedisThread(i));
    }
    System.out.println("已经开启所有的子线程");
    // 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
    executor.shutdown();
    // 判断所有线程是否已经执行完毕
    while (true) {
      if (executor.isTerminated()) {
        System.out.println("所有的子线程都结束了!");
        // 清除redis数据
        ClickRedis.declare();
        break;
      }
      Thread.sleep(100);
    }
  }
}