Fluentd 安装、配置、使用介绍

本篇博文将对fluentd的安装、配置、使用等各方面做一个简单的介绍。

基础

fluentd 既可以作为日志收集器安装到每一个结点上, 也可以作为一个服务端收集各个结点上报的日志流。 你甚至也可以在各个结点上都部署 fluentd 收集日志,然后上报到一个 fluentd 集群做统一处理, 然后再转发到最终的日志存储服务器。

所以在一个完整的日志收集、处理系统里,你可以构建一个这样的日志处理流:

Apps (with fluentd/fluent-bit) -> broker (kafka) -> fluentd cluster -> elasticsearch -> kibana

其中提到的 fluent-bit 是一个极简版的 fluentd,专门用作日志的收集和转发, 可以在应用结点上取代 fluentd 收集日志,满足极端的资源要求。

命令

配置文件的核心是各种命令块(directives),每一种命令都是为了完成某种处理,命令与命令之间还可以组成串联关系,以 pipline 的形式流式的处理和分发日志。

命令的主要组成部分有:

  • source
  • filter
  • match
  • label
  • error

最常见的方式就是 source 收集日志,然后由串联的 filter 做流式的处理,最后交给 match 进行分发。match 是日志流程的终点,一旦匹配了某一个 match,就不会再继续往下匹配了

同时你还可以用 label 将任务分组,用 error 处理异常,用 system 修改运行参数。

不同的命令中,都可以通过 @type 指定想要使用的插件名字,而且还可以传入各式各样的插件参数, 由丰富的插件提供强大的功能,下面是详细一些的说明。

source 是 fluentd 的一切数据的来源,每一个 source 内都包含一个输入模块,比如原生集成的包含 http 和 forward 两个模块,分别用来接收 HTTP 请求和 TCP 请求:

# Receive events from 24224/tcp
# This is used by log forwarding and the fluent-cat command
<source>
  @type forward
  port 24224
</source>

# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
  @type http
  port 9880
</source>

当然,除了这两个外,fluentd 还有大量的支持各种协议或方式的 source 插件,比如最常用的 tail 就可以帮你追踪文件。

每一个具体的插件都包含其特有的参数,比如上例中 port 就是一个参数,当你要使用一个 source 插件的时候,注意看看有哪些参数是需要配置的,然后将其写到 source directive 内。

每一个具体的插件都包含其特有的参数,比如上例中 port 就是一个参数,当你要使用一个 source 插件的时候,注意看看有哪些参数是需要配置的,然后将其写到 source directive 内。

source dirctive 在获取到输入后,会向 fluent 的路由抛出一个事件,这个事件包含三个要素:

  • tag
  • time
  • record

那上例代码中的第二个 source 举例,当我们发起一个 http://this.host:9880/myapp.access?json={"event":"data"}的请求时,这个 source 会抛出:

# generated by http://this.host:9880/myapp.access?json={"event":"data"}
tag: myapp.access
time: (current time)
record: {"event":"data"}

4.2 match

match 用来指定动作,通过 tag 匹配 source,然后执行指定的命令来分发日志,最常见的用法就是将 source 收集的日志转存到数据库。

# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
  @type http
  port 9880
</source>

# 将标记为 myapp.access 的日志转存到文件
<match myapp.access>
  @type file
  path /var/log/fluent/access
</match>

上例中的 myapp.access 就是 tag,tag 有好几种匹配模式:

*:匹配任意一个 tag;
**:匹配任意数量个 tag;
a b:匹配 a 或 b;
{X,Y,Z}:匹配 X, Y, Z 中的一个;

比如可以写成这样:

<match a.*>
<match **>
<match a.{b,c}>
<match a.* b.*>

match 是从上往下依次匹配的,一旦一个日志流被匹配上,就不会再继续匹配剩下的 match 了。 所以如果有 <match **> 这样的全匹配,一定要放到配置文件的最后。

用法和 source 几乎一模一样,不过 source 是抛出事件,match 是接收并处理事件。

而且 match 不仅仅用来处理输出,还可以对日志事件进行一些处理后重新抛出,当成一个新的事件从新走一遍流程,比如可以用rewrite_tag_filter 插件为日志流重新打上 tag,实现通过正则来对日志进行分流的需求:

实战

配置

fluentd-configmap.yaml

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    # Logs from systemd-journal for interesting services.
    <source>
      @id journald-docker
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag kubelet
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      include_tag_key true
      host es-svc
      port 9200
      logstash_format true
      request_timeout    30s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

部署

fluentd-daemonset.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        kubernetes.io/cluster-service: "true"
      # 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案。
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        #image: arm64v8/fluentd:v1.14-1
        image: mesondev/fluentd_elasticsearch:latest
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /data/docker_data/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
      nodeSelector:
        beta.kubernetes.io/fluentd-ds-ready: "true"
      tolerations:
      - operator: Exists
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /data/docker_data/containers
      - name: config-volume
        configMap:
          name: fluentd-config

启动之后收集的日志信息:

[root@master-165 k8sYaml]# kubectl logs -f fluentd-es-kxhlj -n logging
2023-08-06 18:16:34 +0000 [warn]: [elasticsearch] Detected ES 7.x: `_doc` will be used as the document `_type`.

相关文章:
fluentd 安装、配置、使用介绍
fluentd 官方文档配置
【官方文档】Fluentd 输出插件(elasticsearch)
基于 EFK 的 Kubernetes 日志采集方案

为者常成,行者常至