seldon-core 数据回流之 logstash、Elasticsearch、kibana、kafka

一、ELK介绍

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。

  • Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

  • Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。

  • Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。

  • Filebeat是一个轻量型日志采集器,可以方便的同kibana集成,启动filebeat后,可以直接在kibana中观看对日志文件进行detail的过程。

Logstash介绍

Logstash是一个数据流引擎:

它是用于数据物流的开源流式ETL引擎,在几分钟内建立数据流管道,具有水平可扩展及韧性且具有自适应缓冲,不可知的数据源,具有200多个集成和处理器的插件生态系统,使用Elastic Stack监视和管理部署

Logstash包含3个主要部分: 输入(inputs),过滤器(filters)和输出(outputs)。

  • inputs主要用来提供接收数据的规则,比如使用采集文件内容;
  • filters主要是对传输的数据进行过滤,比如使用grok规则进行数据过滤;
  • outputs主要是将接收的数据根据定义的输出模式来进行输出数据,比如输出到ElasticSearch中.

示例图:
file

Logstash安装使用

一、环境选择

Logstash采用JRuby语言编写,运行在jvm中,因此安装Logstash前需要先安装JDK。如果是6.x的版本,jdk需要在8以上,如果是7.x的版本,则jdk版本在11以上。如果Elasticsearch集群是7.x的版本,可以使用Elasticsearch自身的jdk。
Logstash下载地址推荐使用清华大学或华为的开源镜像站。
下载地址:
mirrors.huaweicloud.com/logstash
mirrors.tuna.tsinghua.edu.cn/ELK

ELK7.3.2百度网盘地址:
链接:https://pan.baidu.com/s/1tq3Czywjx3GGrreOAgkiGg
提取码:cxng

二、JDK安装

注:JDK版本请以自身Elasticsearch集群的版本而定。

1,文件准备

解压下载下来的JDK

# 192.168.1.62
cd /home/hemei/apps
tar  -xvf   jdk-8u144-linux-x64.tar.gz

移动到opt/java文件夹中,没有就新建,然后将文件夹重命名为jdk1.8

mkdir -p /opt/java
# mv  jdk1.8.0_144 /opt/java
# mv  jdk1.8.0_144  jdk1.8
[root@node1 apps]# cp -r jdk1.8.0_301 /opt/java
[root@node1 apps]# cd /opt/java
[root@node1 apps]# mv jdk1.8.0_301 jdk1.8

2,环境配置

首先输入 java -version 查看是否安装了JDK,如果安装了,但版本不适合的话,就卸载

[root@node1 java]# java -version
-bash: java: command not found

然后编辑 profile 文件,添加如下配置 输入: vim /etc/profile

export JAVA_HOME=/opt/java/jdk1.8
export JRE_HOME=/opt/java/jdk1.8/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=.:${JAVA_HOME}/bin:$PATH

添加成功之后,输入:

source /etc/profile

使配置生效,然后查看版本信息输入:

[root@node1 java]# java -version
java version "1.8.0_301"
Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)

三、Logstash安装

1,文件准备

将下载下来的logstash-7.3.2.tar.gz的配置文件进行解压 在linux上输入:

tar -xvf logstash-7.3.2.tar.gz

然后移动到/opt/elk 里面,然后将文件夹重命名为 logstash-7.3.2 输入

[root@node1 java]# cd /home/hemei/apps/
[root@node1 apps]# mkdir -p /opt/elk
mv logstash-7.3.2.tar /opt/elk 
mv logstash-7.3.2.tar logstash-7.3.2

2,配置修改

这里简单介绍一下 inputs,filters、outputs三个主要配置。

inputs

inputs主要使用的几个配置项:

  • path:必选项,读取文件的路径,基于glob匹配语法。 exclude:可选项,数组类型,排除不想监听的文件规则,基于glob匹配语法。
  • sincedb_path:可选项,记录sinceddb文件路径以及文件读取信息位置的数据文件。
  • start_position:可选项,可以配置为beginning/end,是否从头读取文件。默认从尾部值为:end。
  • stat_interval:可选项,单位为秒,定时检查文件是否有更新,默认是1秒。
  • discover_interval:可选项,单位为秒,定时检查是否有新文件待读取,默认是15秒
  • ignore_older:可选项,单位为秒,扫描文件列表时,如果该文件上次更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭。
  • close_older:可选项,单位为秒,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认3600秒,即1小时。
  • tags :可选项,在数据处理过程中,由具体的插件来添加或者删除的标记。 type :可选项,自定义处理时间类型。比如nginxlog。
一个简单的input输入示例:
input {
  file {
    path => "/home/logs/mylog.log"
  }
}

上述这段配置表示采集 /home/logs/mylog.log 的日志,如果是采集整个目录的话,则可以通过 * 通配符来进行匹配,如

path => "/home/logs/*.log"

表示采集该目录下所有后缀名为.log的日志。

通过logstash-input-file插件导入了一些本地日志文件时,logstash会通过一个名为 sincedb的独立文件中来跟踪记录每个文件中的当前位置。这使得停止和重新启动Logstash成为可能,并让它在不丢失在停止 Logstashwas 时添加到文件中的行数的情况下继续运行。
调试的时候,我们可能希望取消sincedb的记录功能,使文件每次都能从头开始读取。此时,我们可以这样来做
示例:

input {
  file {
    path => "/home/logs/mylog.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

如果想使用HTTP输入,可以将类型改成http,只不过里面的参数有不同而已,tcp、udp、syslog和beats等等同理。 示例:

input {
  http {
    port => 端口号
  }
}
filter

filter主要是实现过滤的功能,比如使用grok实现日志内容的切分等等。

比如对apache的日志进行grok过滤 样例数据:

127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"

grok:

%{COMBINEDAPACHELOG}

这里我们可以使用kibana的grok来进行分析,grok在开发工具中。当然也可以在grokdebug.herokuapp.com/网站进行匹配调试。

使用示例:

filter {
    grok {
      match => ["message", "%{COMBINEDAPACHELOG}"]
    }
}
output

output主要作用是将数据进行输出,比如输出到文件,或者elasticsearch中。
这里将数据输出到ElasticSearch中,如果是集群,通过逗号可以配置多个节点。

output {   
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
  }
}

如果想在控制台进行日志输出的话,可以加上stdout配置。如果想自定义输出的index话,也可以加上对应的索引库名称,不存在则根据数据内容进行创建,也可以自动按天创建索引库。

output {

  stdout {
    codec => rubydebug
  }

  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "mylogs-%{+YYYY.MM.dd}"
  }
}

更多logstash配置:https://www.elastic.co/guide/en/logstash/current/index.html

3,使用

demo

/home/logs/ 目录下添加一个日志文件, 然后在logstash文件夹中创建一个 logstash-test.conf 文件,然后在该文件中添加如下配置:

input {
  file {
    path => "/home/logs/mylog.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
}

output {
  stdout {
    codec => rubydebug
  }

  elasticsearch {
    hosts => ["192.168.1.62:9200"]
  }
}

然后在logstash 目录输入如下命令进行启动:

nohup ./bin/logstash -f logstash-test.conf

后台启动:

nohup ./bin/logstash -f logstash-test.conf  >/dev/null   2>&1 &

热配置加载启动:

nohup ./bin/logstash -f logstash-test.conf --config.reload.automatic  >/dev/null   2>&1 &

启动成功之后,如果是非后台启动,可以在控制台查看数据的传输,如果是后台启动,则可以在logstash的log目录中进行查看。

四、ES安装

Elasticsearch-基础及使用 Docker 安装

1、下载镜像文件

sudo docker pull elasticsearch:7.3.2   # 存储和检索数据
sudo docker pull kibana:7.3.2     # 可视化检索数据

在安装的时候看一下虚拟机内存是否够用,ES非常的吃内存。

free -m

2、创建实例

切换到root用户

su root

2.1、ElasticSearch

创建配置文件目录,将docker内部es的配置文件挂载到外部,方便修改。

mkdir -p  /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data

# 任何IP都可以访问,注意host和IP中间有空格
echo "http.host: 0.0.0.0" >>/mydata/elasticsearch/config/elasticsearch.yml

创建好目录之后,需要给该目录访问、执行的权限:

chmod -R 777 /mydata/elasticsearch/

运行镜像:

#运行ES
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms256m -Xmx512m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.3.2

与ES交互的端口说明:

  • 9200端口

    ​ RESTful API通过HTTP通信

  • 9300端口

    ​ Java客户端与ES的原生传输协议和集群交互

ES_JAVA_OPTS指定java虚拟机相关参数

  • -Xms128m 初始堆内存大小为128m
  • -Xmx256m 最大堆内存大小为256m
  • discovery.type=single-node 设置为单点启动

ES默认占内存很大,默认占2G,所以,一般在启动时需要指定固定的内存大小,这里我们设置为 128M。

启动之后,可以查看docker 的ES进程:

[root@node1 elk]# docker ps | grep 'elastic'
872474770bfd   elasticsearch:7.3.2                                                           "/usr/local/bin/dock…"   39 seconds ago   Up 38 seconds   0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp   elasticsearch
[root@node1 elk]# 

然后通过浏览器访问ES:http://192.168.1.62:9200/

file

2.2、Kibana

docker run --name kibana -e ELASTICSEARCH_URL=192.168.1.62:9200 -p 5601:5601 \
-d kibana:7.3.2

http://192.168.1.62:9200 一定要改为自己ES虚拟机的地址。

查看是否运行成功:

[root@node1 elk]# docker ps | grep 'kibana'
2a1ecc0f1c02   kibana:7.3.2                                                                  "/usr/local/bin/dumb…"   14 seconds ago   Up 14 seconds   0.0.0.0:5601->5601/tcp, :::5601->5601/tcp                                              kibana
[root@node1 elk]# 

进入容器内部:

[root@node1 elk]# docker exec -it 2a1e /bin/bash
bash-4.2$ pwd  
/usr/share/kibana

kibana 配置文件

cd /usr/share/kibana/config
bash-4.2$ vi kibana.yml 
#
# ** THIS IS AN AUTO-GENERATED FILE **
#

# Default Kibana configuration for docker target
server.name: kibana
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: [ "http://192.168.1.62:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true

然后重启Kibana,可以看到,已经可以正常访问了哈~

[root@node1 ~]# docker restart 2a1e
2a1e

file

http://192.168.1.62:5601/app/kibana

Logstash+ES

日志文件写入数据:

[root@node1 logs]# cat /home/logs/mylog.log
Qatar World Football Cup
shi jie bei hhahha
ssssswo 世界杯
我爱我家
精彩世界杯
大家都理解了哈
[root@node1 logs]# 

修改logstash配置文件,设置es 的index 为 test

[root@node1 logs]# cd /opt/elk/logstash-7.3.2/

[root@node1 logstash-7.3.2]# cat  logstash-test.conf 
input {
  file {
    path => "/home/logs/mylog.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
}

output {
  stdout {
    codec => rubydebug
  }

  elasticsearch {
    hosts => ["192.168.1.62:9200"]
    index => "test"
  }
}
[root@node1 logstash-7.3.2]# 

后台启动:

nohup ./bin/logstash -f logstash-test.conf  >/dev/null   2>&1 &

kibana查看:
file

Kafka

  1. pull镜像

    docker pull zookeeper
    docker pull wurstmeister/kafka
  2. 创建通信网络。zookeeper和kafka之间的通信

    docker network create kafka_zk_net
查看网络
docker network ls
docker network inspect kafka_zk_net
  1. 创建容器
    docker run --net=kafka_zk_net --name zookeeper -p 21810:2181 -d docker.io/zookeeper
docker run --net=kafka_zk_net --name kafka -p 9093:9092 \
--link zookeeper \
-e KAFKA_BROKER_ID=4 \
-e KAFKA_ZOOKEEPER_CONNECT=172.18.0.2:2181 \
-e KAFKA_ADVERTISED_HOST_NAME=192.168.1.62 \
-e KAFKA_ADVERTISED_PORT=9092 \
-d wurstmeister/kafka
  • KAFKA_ADVERTISED_HOST_NAME 参数需要设置为宿主机地址(或服务器外网地址)
  • KAFKA_ZOOKEEPER_CONNECT 参数设置zookeeper容器内部地址和端口,查看指令为 docker inspect kafka_zk_net
 "ConfigOnly": false,
        "Containers": {
            "0de6a3fbcaa12f3dd08e290d4f20dc28109bf242959595aa08e4bb0ab9e6ffaa": {
                "Name": "zookeeper",
                "EndpointID": "89e64a38df7612399817496b4420b8be6876f9dfb835330e87fe1e7179605013",
                "MacAddress": "02:42:ac:12:00:02",
                "IPv4Address": "172.18.0.2/16",
                "IPv6Address": ""
            }
  1. 启动生产者和消费者

    docker exec -it  kafka bash
    cd opt/kafka_2.13-2.8.1/bin/
    ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_test  生产者
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning  消费者
  2. bug

  3. 启动consumer发现: Error while fetching metadata with correlation id

修复步骤:

vi /opt/kafka/config/server.properties

添加:

# 只能内网访问
#listeners=PLAINTEXT://localhost:9092
#advertised.listeners=PLAINTEXT://localhost:9092

# 可外网访问
listeners=PLAINTEXT://192.168.1.62:9092
advertised.listeners=PLAINTEXT://192.168.1.62:9092
  • listeners 就是主要用来定义Kafka Broker的Listener的配置项。
  • advertised.listeners 参数的作用就是将Broker的Listener信息发布到Zookeeper中

发现:Docker:bash: vi: command not found

apt-get update
apt-get install vim

停止kafka容器和重启

docker stop kafka
docker restart kafka

相关文章:
ElasticSearch实战系列六: Logstash快速入门和实战
ElasticSearch实战系列七: Logstash实战使用-图文讲解
Elasticsearch-基础及使用 Docker 安装
Docker(单机Kafka安装)
Kafka Docker部署
seldon core 通过 kafka 实现数据回流

为者常成,行者常至