seldon-core 数据回流之 logstash、Elasticsearch、kibana、kafka
一、ELK介绍
ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。
-
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
-
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
-
Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。
-
Filebeat是一个轻量型日志采集器,可以方便的同kibana集成,启动filebeat后,可以直接在kibana中观看对日志文件进行detail的过程。
Logstash介绍
Logstash是一个数据流引擎:
它是用于数据物流的开源流式ETL引擎,在几分钟内建立数据流管道,具有水平可扩展及韧性且具有自适应缓冲,不可知的数据源,具有200多个集成和处理器的插件生态系统,使用Elastic Stack监视和管理部署
Logstash包含3个主要部分: 输入(inputs),过滤器(filters)和输出(outputs)。
- inputs主要用来提供接收数据的规则,比如使用采集文件内容;
- filters主要是对传输的数据进行过滤,比如使用grok规则进行数据过滤;
- outputs主要是将接收的数据根据定义的输出模式来进行输出数据,比如输出到ElasticSearch中.
示例图:
Logstash安装使用
一、环境选择
Logstash采用JRuby语言编写,运行在jvm中,因此安装Logstash前需要先安装JDK。如果是6.x的版本,jdk需要在8以上,如果是7.x的版本,则jdk版本在11以上。如果Elasticsearch集群是7.x的版本,可以使用Elasticsearch自身的jdk。
Logstash下载地址推荐使用清华大学或华为的开源镜像站。
下载地址:
mirrors.huaweicloud.com/logstash
mirrors.tuna.tsinghua.edu.cn/ELK
ELK7.3.2百度网盘地址:
链接:https://pan.baidu.com/s/1tq3Czywjx3GGrreOAgkiGg
提取码:cxng
二、JDK安装
注:JDK版本请以自身Elasticsearch集群的版本而定。
1,文件准备
解压下载下来的JDK
# 192.168.1.62
cd /home/hemei/apps
tar -xvf jdk-8u144-linux-x64.tar.gz
移动到opt/java文件夹中,没有就新建,然后将文件夹重命名为jdk1.8
mkdir -p /opt/java
# mv jdk1.8.0_144 /opt/java
# mv jdk1.8.0_144 jdk1.8
[root@node1 apps]# cp -r jdk1.8.0_301 /opt/java
[root@node1 apps]# cd /opt/java
[root@node1 apps]# mv jdk1.8.0_301 jdk1.8
2,环境配置
首先输入 java -version 查看是否安装了JDK,如果安装了,但版本不适合的话,就卸载
[root@node1 java]# java -version
-bash: java: command not found
然后编辑 profile 文件,添加如下配置 输入: vim /etc/profile
export JAVA_HOME=/opt/java/jdk1.8
export JRE_HOME=/opt/java/jdk1.8/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=.:${JAVA_HOME}/bin:$PATH
添加成功之后,输入:
source /etc/profile
使配置生效,然后查看版本信息输入:
[root@node1 java]# java -version
java version "1.8.0_301"
Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)
三、Logstash安装
1,文件准备
将下载下来的logstash-7.3.2.tar.gz的配置文件进行解压 在linux上输入:
tar -xvf logstash-7.3.2.tar.gz
然后移动到/opt/elk 里面,然后将文件夹重命名为 logstash-7.3.2 输入
[root@node1 java]# cd /home/hemei/apps/
[root@node1 apps]# mkdir -p /opt/elk
mv logstash-7.3.2.tar /opt/elk
mv logstash-7.3.2.tar logstash-7.3.2
2,配置修改
这里简单介绍一下 inputs,filters、outputs三个主要配置。
inputs
inputs主要使用的几个配置项:
- path:必选项,读取文件的路径,基于glob匹配语法。 exclude:可选项,数组类型,排除不想监听的文件规则,基于glob匹配语法。
- sincedb_path:可选项,记录sinceddb文件路径以及文件读取信息位置的数据文件。
- start_position:可选项,可以配置为beginning/end,是否从头读取文件。默认从尾部值为:end。
- stat_interval:可选项,单位为秒,定时检查文件是否有更新,默认是1秒。
- discover_interval:可选项,单位为秒,定时检查是否有新文件待读取,默认是15秒
- ignore_older:可选项,单位为秒,扫描文件列表时,如果该文件上次更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭。
- close_older:可选项,单位为秒,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认3600秒,即1小时。
- tags :可选项,在数据处理过程中,由具体的插件来添加或者删除的标记。 type :可选项,自定义处理时间类型。比如nginxlog。
一个简单的input输入示例:
input {
file {
path => "/home/logs/mylog.log"
}
}
上述这段配置表示采集 /home/logs/mylog.log
的日志,如果是采集整个目录的话,则可以通过 * 通配符来进行匹配,如
path => "/home/logs/*.log"
表示采集该目录下所有后缀名为.log的日志。
通过logstash-input-file插件导入了一些本地日志文件时,logstash会通过一个名为 sincedb
的独立文件中来跟踪记录每个文件中的当前位置。这使得停止和重新启动Logstash成为可能,并让它在不丢失在停止 Logstashwas
时添加到文件中的行数的情况下继续运行。
在调试的时候,我们可能希望取消sincedb的记录功能,使文件每次都能从头开始读取。此时,我们可以这样来做
示例:
input {
file {
path => "/home/logs/mylog.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
如果想使用HTTP输入,可以将类型改成http,只不过里面的参数有不同而已,tcp、udp、syslog和beats等等同理。 示例:
input {
http {
port => 端口号
}
}
filter
filter主要是实现过滤的功能,比如使用grok实现日志内容的切分等等。
比如对apache的日志进行grok过滤 样例数据:
127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
grok:
%{COMBINEDAPACHELOG}
这里我们可以使用kibana的grok来进行分析,grok在开发工具中。当然也可以在grokdebug.herokuapp.com/网站进行匹配调试。
使用示例:
filter {
grok {
match => ["message", "%{COMBINEDAPACHELOG}"]
}
}
output
output主要作用是将数据进行输出,比如输出到文件,或者elasticsearch中。
这里将数据输出到ElasticSearch中,如果是集群,通过逗号可以配置多个节点。
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
}
如果想在控制台进行日志输出的话,可以加上stdout配置。如果想自定义输出的index话,也可以加上对应的索引库名称,不存在则根据数据内容进行创建,也可以自动按天创建索引库。
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "mylogs-%{+YYYY.MM.dd}"
}
}
更多logstash配置:https://www.elastic.co/guide/en/logstash/current/index.html
3,使用
demo
在 /home/logs/
目录下添加一个日志文件, 然后在logstash文件夹中创建一个 logstash-test.conf
文件,然后在该文件中添加如下配置:
input {
file {
path => "/home/logs/mylog.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["192.168.1.62:9200"]
}
}
然后在logstash 目录输入如下命令进行启动:
nohup ./bin/logstash -f logstash-test.conf
后台启动:
nohup ./bin/logstash -f logstash-test.conf >/dev/null 2>&1 &
热配置加载启动:
nohup ./bin/logstash -f logstash-test.conf --config.reload.automatic >/dev/null 2>&1 &
启动成功之后,如果是非后台启动,可以在控制台查看数据的传输,如果是后台启动,则可以在logstash的log目录中进行查看。
四、ES安装
1、下载镜像文件
sudo docker pull elasticsearch:7.3.2 # 存储和检索数据
sudo docker pull kibana:7.3.2 # 可视化检索数据
在安装的时候看一下虚拟机内存是否够用,ES非常的吃内存。
free -m
2、创建实例
切换到root用户
su root
2.1、ElasticSearch
创建配置文件目录,将docker内部es的配置文件挂载到外部,方便修改。
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
# 任何IP都可以访问,注意host和IP中间有空格
echo "http.host: 0.0.0.0" >>/mydata/elasticsearch/config/elasticsearch.yml
创建好目录之后,需要给该目录访问、执行的权限:
chmod -R 777 /mydata/elasticsearch/
运行镜像:
#运行ES
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms256m -Xmx512m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.3.2
与ES交互的端口说明:
-
9200端口
RESTful API通过HTTP通信
-
9300端口
Java客户端与ES的原生传输协议和集群交互
ES_JAVA_OPTS指定java虚拟机相关参数
- -Xms128m 初始堆内存大小为128m
- -Xmx256m 最大堆内存大小为256m
- discovery.type=single-node 设置为单点启动
ES默认占内存很大,默认占2G,所以,一般在启动时需要指定固定的内存大小,这里我们设置为 128M。
启动之后,可以查看docker 的ES进程:
[root@node1 elk]# docker ps | grep 'elastic'
872474770bfd elasticsearch:7.3.2 "/usr/local/bin/dock…" 39 seconds ago Up 38 seconds 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp elasticsearch
[root@node1 elk]#
然后通过浏览器访问ES:http://192.168.1.62:9200/
2.2、Kibana
docker run --name kibana -e ELASTICSEARCH_URL=192.168.1.62:9200 -p 5601:5601 \
-d kibana:7.3.2
http://192.168.1.62:9200 一定要改为自己ES虚拟机的地址。
查看是否运行成功:
[root@node1 elk]# docker ps | grep 'kibana'
2a1ecc0f1c02 kibana:7.3.2 "/usr/local/bin/dumb…" 14 seconds ago Up 14 seconds 0.0.0.0:5601->5601/tcp, :::5601->5601/tcp kibana
[root@node1 elk]#
进入容器内部:
[root@node1 elk]# docker exec -it 2a1e /bin/bash
bash-4.2$ pwd
/usr/share/kibana
kibana 配置文件
cd /usr/share/kibana/config
bash-4.2$ vi kibana.yml
#
# ** THIS IS AN AUTO-GENERATED FILE **
#
# Default Kibana configuration for docker target
server.name: kibana
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: [ "http://192.168.1.62:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
然后重启Kibana,可以看到,已经可以正常访问了哈~
[root@node1 ~]# docker restart 2a1e
2a1e
http://192.168.1.62:5601/app/kibana
Logstash+ES
日志文件写入数据:
[root@node1 logs]# cat /home/logs/mylog.log
Qatar World Football Cup
shi jie bei hhahha
ssssswo 世界杯
我爱我家
精彩世界杯
大家都理解了哈
[root@node1 logs]#
修改logstash配置文件,设置es 的index 为 test
[root@node1 logs]# cd /opt/elk/logstash-7.3.2/
[root@node1 logstash-7.3.2]# cat logstash-test.conf
input {
file {
path => "/home/logs/mylog.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["192.168.1.62:9200"]
index => "test"
}
}
[root@node1 logstash-7.3.2]#
后台启动:
nohup ./bin/logstash -f logstash-test.conf >/dev/null 2>&1 &
kibana查看:
Kafka
-
pull镜像
docker pull zookeeper docker pull wurstmeister/kafka
-
创建通信网络。zookeeper和kafka之间的通信
docker network create kafka_zk_net
查看网络
docker network ls
docker network inspect kafka_zk_net
- 创建容器
docker run --net=kafka_zk_net --name zookeeper -p 21810:2181 -d docker.io/zookeeper
docker run --net=kafka_zk_net --name kafka -p 9093:9092 \
--link zookeeper \
-e KAFKA_BROKER_ID=4 \
-e KAFKA_ZOOKEEPER_CONNECT=172.18.0.2:2181 \
-e KAFKA_ADVERTISED_HOST_NAME=192.168.1.62 \
-e KAFKA_ADVERTISED_PORT=9092 \
-d wurstmeister/kafka
- KAFKA_ADVERTISED_HOST_NAME 参数需要设置为宿主机地址(或服务器外网地址)
- KAFKA_ZOOKEEPER_CONNECT 参数设置zookeeper容器内部地址和端口,查看指令为 docker inspect kafka_zk_net
"ConfigOnly": false,
"Containers": {
"0de6a3fbcaa12f3dd08e290d4f20dc28109bf242959595aa08e4bb0ab9e6ffaa": {
"Name": "zookeeper",
"EndpointID": "89e64a38df7612399817496b4420b8be6876f9dfb835330e87fe1e7179605013",
"MacAddress": "02:42:ac:12:00:02",
"IPv4Address": "172.18.0.2/16",
"IPv6Address": ""
}
-
启动生产者和消费者
docker exec -it kafka bash cd opt/kafka_2.13-2.8.1/bin/ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_test 生产者 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning 消费者
-
bug
-
启动consumer发现:
Error while fetching metadata with correlation id
修复步骤:
vi /opt/kafka/config/server.properties
添加:
# 只能内网访问
#listeners=PLAINTEXT://localhost:9092
#advertised.listeners=PLAINTEXT://localhost:9092
# 可外网访问
listeners=PLAINTEXT://192.168.1.62:9092
advertised.listeners=PLAINTEXT://192.168.1.62:9092
- listeners 就是主要用来定义Kafka Broker的Listener的配置项。
- advertised.listeners 参数的作用就是将Broker的Listener信息发布到Zookeeper中
发现:Docker:bash: vi: command not found
apt-get update
apt-get install vim
停止kafka容器和重启
docker stop kafka
docker restart kafka
相关文章:
ElasticSearch实战系列六: Logstash快速入门和实战
ElasticSearch实战系列七: Logstash实战使用-图文讲解
Elasticsearch-基础及使用 Docker 安装
Docker(单机Kafka安装)
Kafka Docker部署
seldon core 通过 kafka 实现数据回流
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)