EFK 在线日志收集分析管理平台及本地实践

前篇文章讲了 k8s 上部署 EFK日志收集实战,但是在测试的时候还不能采集宿主机日志,所以这里再次调试。

理论

file

实战

首先,我们通过 ConfigMap 对象来指定 Fluentd 配置文件,新建 fluentd-configmap.yaml 文件,该配置文件只采集宿主机路径下的日志文件,如您的应用(业务)pod日志挂载在宿主机 /data/nfs-data/log/ 下,则通过该配置来采集:

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  business.input.conf: |-
    <source>
      @id business.log
      @type tail
      @log_level debug # 调试的时候配置为 debug打印详细采集信息,生产可设置为 info
      path /data/nfs_data/log/*.log
      pos_file /data/nfs_data/log/bs.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag business.log
      format none
      read_from_head true
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter business.log>
      @type record_transformer
      <record>
        source_path /data/nfs_data/log
      </record>
    </filter>
    <match business.log>  # 匹配 business.log 标签
      @id elasticsearch
      @type elasticsearch
      @log_level debug  # 调试的时候配置为 debug打印详细采集信息,生产可设置为 info
      include_tag_key true
      host es-svc
      port 9200
      logstash_format true
      request_timeout    30s
      #index_name logstash-${tag}.%Y%m%d  # 设置索引名称
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

然后新建一个 fluentd-daemonset.yaml 的文件,文件内容如下:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        kubernetes.io/cluster-service: "true"
      # 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案。
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        #image: arm64v8/fluentd:v1.14-1
        image: mesondev/fluentd_elasticsearch:latest
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: bizlog # 注意:fluentd以容器方式部署,要采集宿主机目录日志,需要做挂载
          mountPath: /data/nfs_data/log
        - name: varlibdockercontainers
          mountPath: /data/docker_data/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
      nodeSelector:
        beta.kubernetes.io/fluentd-ds-ready: "true"
      tolerations:
      - operator: Exists
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: bizlog   # 注意:fluentd以容器方式部署,要采集宿主机目录日志,需要做挂载
        hostPath:
          path: /data/nfs_data/log
      - name: varlibdockercontainers
        hostPath:
          path: /data/docker_data/containers
      - name: config-volume
        configMap:
          name: fluentd-config

部署配置文件:

kubectl apply -f fluentd-configmap.yaml
kubectl apply -f fluentd-daemonset.yaml

查看部署的pod:

[root@master log]# kubectl get pods -n logging
NAME                             READY   STATUS    RESTARTS   AGE
counter                          1/1     Running   0          4d21h
elasticsearch-5867d7b46f-vxhwn   1/1     Running   0          39h
fluentd-es-47s6t                 1/1     Running   0          19m
fluentd-es-kqx2n                 1/1     Running   0          19m
fluentd-es-nfpxw                 1/1     Running   0          19m
kibana-588697489c-b42jk          1/1     Running   0          47h

查看采集的日志:
file

进入到 fluentd pod查看日志采集打印信息:

[root@master-165 k8sYaml]# kubectl logs -f fluentd-es-47s6t -n logging
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] 'host es-svc' is tested built-in placeholder(s) but there is no valid placeholder(s). error: Parameter 'host: es-svc' doesn't have tag placeholder
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] 'index_name logstash-${tag}.%Y%m%d' is tested built-in placeholder(s) but there is no valid placeholder(s). error: Parameter 'index_name: logstash-${tag}.%Y%m%d' has timestamp placeholders, but chunk key 'time' is not configured
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] Need substitution: true
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] 'host_placeholder es-svc' is tested built-in placeholder(s) but there is no valid placeholder(s). error: Parameter 'host_placeholder: es-svc' doesn't have tag placeholder
2023-08-08 07:41:30 +0000 [warn]: [elasticsearch] Detected ES 7.x: `_doc` will be used as the document `_type`.
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] buffer started instance=1780 stage_size=0 queue_size=0
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] flush_thread actually running
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] flush_thread actually running
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] enqueue_thread actually running
2023-08-08 07:41:30 +0000 [info]: [business.log] following tail of /data/nfs_data/log/ms.runtime.log
2023-08-08 07:41:30 +0000 [debug]: [elasticsearch] Created new chunk chunk_id="602647dc96215448fbea6b759cbec52c" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="business.log", variables=nil, seq=0>
2023-08-08 07:41:30 +0000 [info]: [business.log] following tail of /data/nfs_data/log/demo.err.log
2023-08-08 07:41:30 +0000 [info]: [business.log] following tail of /data/nfs_data/log/demo.out.log
2023-08-08 07:41:30 +0000 [info]: [business.log] following tail of /data/nfs_data/log/490_2023-05-03.log
2023-08-08 07:41:30.588459564 +0000 fluent.debug: {"instance":1780,"stage_size":0,"queue_size":0,"message":"[elasticsearch] buffer started instance=1780 stage_size=0 queue_size=0"}2023-08-08 07:41:30.588950709 +0000 fluent.debug: {"message":"[elasticsearch] flush_thread actually running"}
2023-08-08 07:41:30.589040686 +0000 fluent.debug: {"message":"[elasticsearch] flush_thread actually running"}
2023-08-08 07:41:30.589167082 +0000 fluent.debug: {"message":"[elasticsearch] enqueue_thread actually running"}
2023-08-08 07:41:30.591408791 +0000 fluent.info: {"message":"[business.log] following tail of /data/nfs_data/log/demo.runtime.log"}
2023-08-08 07:41:30.599119627 +0000 fluent.debug: {"chunk_id":"602647dc96215448fbea6b759cbec52c","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"business.log\", variables=nil, seq=0>","message":"[elasticsearch] Created new chunk chunk_id=\"602647dc96215448fbea6b759cbec52c\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"business.log\", variables=nil, seq=0>"}
2023-08-08 07:41:30.600458144 +0000 fluent.info: {"message":"[business.log] following tail of /data/nfs_data/log/demo.err.log"}
2023-08-08 07:41:30.615603225 +0000 fluent.info: {"message":"[business.log] following tail of /data/nfs_data/log/demo.out.log"}
2023-08-08 07:41:30.642635108 +0000 fluent.info: {"message":"[business.log] following tail of /data/nfs_data/log/490_2023-05-03.log"}
2023-08-08 07:41:36 +0000 [warn]: dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4CMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''" location=nil tag="business.log" time=2023-08-08 07:41:30.643011016 +0000 record={"message"=>"2023-05-03 01:51:13 - monitor - info - \xBC\xE0\xBF\xD8\xC8\xCE\xCE\xF1\xD5\xFD\xD4\xDA\xD7\xBC\xB1\xB8\xD4\xCB\xD0\xD0\xD6\xD0monitor_id\xA3\xBA 490", "source_path"=>"/data/nfs_data/log"}
2023-08-08 07:41:36.289388031 +0000 fluent.warn: {"error":"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4CMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''>","location":null,"tag":"business.log","time":1691480490,"record":{"message":"2023-05-03 01:51:13 - monitor - info - 490","source_path":"/data/nfs_data/log"},"message":"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4CMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''\" location=nil tag=\"business.log\" time=2023-08-08 07:41:30.643011016 +0000 record={\"message\"=>\"2023-05-03 01:51:13 - monitor - info - \\xBC\\xE0\\xBF\\xD8\\xC8\\xCE\\xCE\\xF1\\xD5\\xFD\\xD4\\xDA\\xD7\\xBC\\xB1\\xB8\\xD4\\xCB\\xD0\\xD0\\xD6\\xD0monitor_id\\xA3\\xBA 490\", \"source_path\"=>\"/data/nfs_data/log\"}"}
2023-08-08 07:41:36 +0000 [warn]: dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4SMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''" location=nil tag="business.log" time=2023-08-08 07:41:30.643013116 +0000 record={"message"=>"2023-05-03 01:51:13 - monitor - info - \xBC\xE0\xBF\xD8\xC8\xCE\xCE\xF1\xCA\xB1\xBC\xE4\xB2\xCE\xCA\xFD:start_time, end_time, next_start_time, max_date_type, data_day 2023-04-01 00:00:00, 2023-05-01 00:00:00 2023-06-01 00:00:00, 3, None", "source_path"=>"/data/nfs_data/log"}
2023-08-08 07:41:36.289677402 +0000 fluent.warn: {"error":"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4SMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''>","location":null,"tag":"business.log","time":1691480490,"record":{"message":"2023-05-03 01:51:13 - monitor - info -:start_time, end_time, next_start_time, max_date_type, data_day 2023-04-01 00:00:00, 2023-05-01 00:00:00 2023-06-01 00:00:00, 3, None","source_path":"/data/nfs_data/log"},"message":"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4SMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''\" location=nil tag=\"business.log\" time=2023-08-08 07:41:30.643013116 +0000 record={\"message\"=>\"2023-05-03 01:51:13 - monitor - info - \\xBC\\xE0\\xBF\\xD8\\xC8\\xCE\\xCE\\xF1\\xCA\\xB1\\xBC\\xE4\\xB2\\xCE\\xCA\\xFD:start_time, end_time, next_start_time, max_date_type, data_day 2023-04-01 00:00:00, 2023-05-01 00:00:00 2023-06-01 00:00:00, 3, None\", \"source_path\"=>\"/data/nfs_data/log\"}"}
2023-08-08 07:41:36 +0000 [warn]: dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4iMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''" location=nil tag="business.log" time=2023-08-08 07:41:30.643013956 +0000 record={"message"=>"2023-05-03 01:51:13 - monitor - info - \xB1\xBE\xB4\xCE\xBC\xE0\xBF\xD8\xC8\xCE\xCE\xF1\xD4\xCB\xD0\xD0\xCD\xEA\xB3\xC9\xA3\xAC \xCF\xC2\xB8\xF6\xBC\xE0\xBF\xD8\xC8\xCE\xCE\xF1\xD5\xFD\xD4\xDA\xD7\xBC\xB1\xB8. ", "source_path"=>"/data/nfs_data/log"}
2023-08-08 07:41:36.289798358 +0000 fluent.warn: {"error":"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4iMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''>","location":null,"tag":"business.log","time":1691480490,"record":{"message":"2023-05-03 01:51:13 - monitor - info - . ","source_path":"/data/nfs_data/log"},"message":"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4iMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''\" location=nil tag=\"business.log\" time=2023-08-08 07:41:30.643013956 +0000 record={\"message\"=>\"2023-05-03 01:51:13 - monitor - info - \\xB1\\xBE\\xB4\\xCE\\xBC\\xE0\\xBF\\xD8\\xC8\\xCE\\xCE\\xF1\\xD4\\xCB\\xD0\\xD0\\xCD\\xEA\\xB3\\xC9\\xA3\\xAC \\xCF\\xC2\\xB8\\xF6\\xBC\\xE0\\xBF\\xD8\\xC8\\xCE\\xCE\\xF1\\xD5\\xFD\\xD4\\xDA\\xD7\\xBC\\xB1\\xB8. \", \"source_path\"=>\"/data/nfs_data/log\"}"}
2023-08-08 07:41:36 +0000 [warn]: dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4yMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''" location=nil tag="business.log" time=2023-08-08 07:41:30.643014736 +0000 record={"message"=>"2023-05-03 01:51:15 - monitor - info - \xBC\xE0\xBF\xD8\xD4\xA4\xBE\xAF\xB4\xED\xCE\xF3.Traceback (most recent call last):", "source_path"=>"/data/nfs_data/log"}
2023-08-08 07:41:36.289877856 +0000 fluent.warn: {"error":"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4yMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''>","location":null,"tag":"business.log","time":1691480490,"record":{"message":"2023-05-03 01:51:15 - monitor - info - Traceback (most recent call last):","source_path":"/data/nfs_data/log"},"message":"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id '4yMY1IkBmaOpFwGmFh_I'. Preview of field's value: '''\" location=nil tag=\"business.log\" time=2023-08-08 07:41:30.643014736 +0000 record={\"message\"=>\"2023-05-03 01:51:15 - monitor - info - \\xBC\\xE0\\xBF\\xD8\\xD4\\xA4\\xBE\\xAF\\xB4\\xED\\xCE\\xF3.Traceback (most recent call last):\", \"source_path\"=>\"/data/nfs_data/log\"}"}
2023-08-08 07:41:36 +0000 [debug]: [elasticsearch] Indexed (op = index), 2628 successes, 4 bad_argument
2023-08-08 07:41:36.290930542 +0000 fluent.debug: {"message":"[elasticsearch] Indexed (op = index), 2628 successes, 4 bad_argument"}
2023-08-08 07:44:24 +0000 [debug]: [elasticsearch] Created new chunk chunk_id="602648828ad97f36fc9632f38ba1b196" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="business.log", variables=nil, seq=0>
2023-08-08 07:44:24.616529234 +0000 fluent.debug: {"chunk_id":"602648828ad97f36fc9632f38ba1b196","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"business.log\", variables=nil, seq=0>","message":"[elasticsearch] Created new chunk chunk_id=\"602648828ad97f36fc9632f38ba1b196\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"business.log\", variables=nil, seq=0>"}

多行日志合并

对于上述日志的多行格式,您可以使用 Fluentd 的 multiline 插件来处理多行日志。根据日志示例,看起来每个日志事件都由时间戳、日志级别、线程名、进程ID、文件名和行号等信息组成。您可以尝试使用正则表达式来匹配这些信息,并将它们合并成一条完整的日志事件。

以下是一个示例 Fluentd 配置,用于处理您提供的多行日志格式:

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  # 其他配置项...
  business.input.conf: |-
    <source>
      @id business.log
      @type tail
      path /data/nfs_data/log/*.log
      pos_file /data/nfs_data/log/bs.log.pos
      time_format %Y-%m-%d %H:%M:%S.%L
      localtime
      tag business.log
      format none
      read_from_head true

      # 使用 multiline 插件进行多行日志合并
      <parse>
        @type multiline
        format_firstline /^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3}/  # 正则表达式用于识别多行日志的起始行
        format1 /^(?<log>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3}\s\w+\s[\w.-]+:\d+-[\w.-]+:\d+-[\w.-]+:\d+-[\w.-]+:\d+-.*)/   # 匹配多行日志的正则表达式
      </parse>
    </source>
  # 其他配置项...
在上述示例中,我们在 `business.input.conf` 的 `<source>` 部分添加了 `<parse>` 子段,并配置了 `multiline` 插件。使用 `format_firstline` 参数来指定多行日志的起始行的正则表达式,这里使用的是 `\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3}` 来识别时间戳作为多行日志的起始行。然后,使用 `format1` 参数来指定多行日志的正则表达式,这里使用 `(?<log>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3}\s\w+\s[\w.-]+:\d+-[\w.-]+:\d+-[\w.-]+:\d+-[\w.-]+:\d+-.*)` 来匹配整个多行日志事件。

通过配置 multiline 插件,Fluentd 将会根据指定的正则表达式将多行日志合并成一条完整的日志事件,并发送到 Elasticsearch 中。请根据实际情况调整正则表达式,以适配您的多行日志格式。

小结

1、在 fluentd config 文件中配置 log_devel 级别为 debug,可打印详细source、match日志,帮助排查错误:

 @log_level debug # 调试的时候配置为 debug打印详细采集信息,生产可设置为 info

2、fluentd以容器pod方式部署,如要采集宿主机(节点)目录日志,需要将宿主机日志路径挂载到容器目录,这样才可以采集到,因为pod内部的 fluentd服务无法采集到容器外部的路径,需要做挂载,之前做测试,一直采集不到日志,在这里排查了很久。

说明:/data/nfs_data/log 目录在宿主机上,所以需要挂载到容器内部,这样才可以采集到。

...

 spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        #image: arm64v8/fluentd:v1.14-1
        image: mesondev/fluentd_elasticsearch:latest
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: bizlog # 注意:fluentd以容器方式部署,要采集宿主机目录日志,需要做挂载
          mountPath: /data/nfs_data/log
        - name: varlibdockercontainers
          mountPath: /data/docker_data/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
      nodeSelector:
        beta.kubernetes.io/fluentd-ds-ready: "true"
      tolerations:
      - operator: Exists
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: bizlog   # 注意:fluentd以容器方式部署,要采集宿主机目录日志,需要做挂载
        hostPath:
          path: /data/nfs_data/log
      - name: varlibdockercontainers
        hostPath:
          path: /data/docker_data/containers
      - name: config-volume
        configMap:
          name: fluentd-config

完整配置

fluented-conf-cm.yaml

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  business.input.conf: |-
    <source>
      @id business.log
      @type tail
      @log_level debug # 调试的时候配置为 debug打印详细采集信息,生产可设置为 info
      path /data/nfs_data/log/*.log
      pos_file /data/nfs_data/log/bs.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag business.log
      read_from_head true

      <parse>
        @type multiline   # 使用 multiline 插件进行多行日志合并
        format_firstline /\d{4}-\d{1,2}-\d{1,2}\s+\d{2}:\d{2}:\d{2}\.\d{3}/  # 正则表达式用于识别多行日志的起始行(验证)https://c.runoob.com/front-end/854/?optionGlobl=global
        format1 /(?<time>\d{4}-\d{1,2}-\d{1,2}\s+\d{2}:\d{2}:\d{2}\.\d{3})\s+(?<level>\w+)(?<message>.*(\n*.*)*)/   # 匹配多行日志的正则表达式
      </parse>
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>

    <match **>  # 匹配 business.log 标签
      @id elasticsearch
      @type elasticsearch
      @log_level debug  # 调试的时候配置为 debug打印详细采集信息,生产可设置为 info
      include_tag_key true
      host es-svc
      port 9200
      logstash_format true
      request_timeout    30s
      #index_name logstash-${tag}.%Y%m%d  # 设置索引名称
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

fluentd-daemon.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        kubernetes.io/cluster-service: "true"
      # 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案。
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        #image: arm64v8/fluentd:v1.14-1
        image: mesondev/fluentd_elasticsearch:latest
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: bizlog # 注意:fluentd以容器方式部署,要采集宿主机目录日志,需要做挂载
          mountPath: /data/nfs_data/log
        - name: varlibdockercontainers
          mountPath: /data/docker_data/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
      nodeSelector:
        beta.kubernetes.io/fluentd-ds-ready: "true"
      tolerations:
      - operator: Exists
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: bizlog   # 注意:fluentd以容器方式部署,要采集宿主机目录日志,需要做挂载
        hostPath:
          path: /data/nfs_data/log
      - name: varlibdockercontainers
        hostPath:
          path: /data/docker_data/containers
      - name: config-volume
        configMap:
          name: fluentd-config

相关文章:
k8s 上部署 EFK日志收集实战
Fluentd 安装、配置、使用介绍
fluentd配置文件详解|官网

为者常成,行者常至