Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

help request: Multiple Buffers in "cluster-wide and namespaced FluentdConfig multi-tenant scenarios" #1461

Open
Crazyigor1987 opened this issue Feb 4, 2025 · 0 comments

Comments

@Crazyigor1987
Copy link

Describe the issue

Hi everyone.
does somebody get fluent-operator fully working with kubernetes with mixed clusterinput, -filter, and -output written by cloudops and Filter written by Devops?

I want in fluentd something like this:

  1. Source is defined as ClusterInput by CloudOp
  2. ClusterFilter adds company related metainformation to the incoming Logs. In my example, i want to append the opensearch indexname (zzz-namespace-index).
  3. CloudOp setup a Opensearch ClusterOutput, with a Buffer Section for log persistency
  4. Developer creates his own Filter, which referes to the ClusterOutput.

This are my manifests i managed to get running so far:

apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFluentdConfig
metadata:
  labels:
    fluentd.default.config: "true"
  name: fluentd-config
  namespace: fluent
spec:
  clusterInputSelector:
    matchLabels:
      fluentd.default.input: "true"
  clusterFilterSelector:
    matchLabels:
      fluentd.default.filter: "true"
  clusterOutputSelector:
    matchLabels:
      fluentd.default.output: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
  labels:
    app.kubernetes.io/name: fluentd
  name: fluentd
  namespace: fluent
spec:
  envVars:
  - name: FLUENTD_OUTPUT_LOGLEVEL
    value: debug
  - name: OPENSEARCH_USERNAME
    valueFrom:
      secretKeyRef:
        name: fluentbit-credentials
        key: username
  - name: OPENSEARCH_PASSWORD
    valueFrom:
      secretKeyRef:
        name: fluentbit-credentials
        key: password
  - name: OPENSEARCH_HOST
    value: opensearch-ingest.my-domain.org
  fluentdCfgSelector:
    matchLabels:
      fluentd.default.config: "true"
  buffer:
    pvc:
      apiVersion: v1
      kind: PersistentVolumeClaim
      spec:
        accessModes:
        - ReadWriteOnce
        resources:
          requests:
            storage: 16Gi
        storageClassName: default
  defaultFilterSelector:
    matchLabels:
      fluentd.default.filter: "true"
  defaultOutputSelector:
    matchLabels:
      fluentd.default.output: "true"
  globalInputs:
  - forward:
      bind: 0.0.0.0
      port: 24224
  image: ghcr.io/fluent/fluent-operator/fluentd:v1.17.0
  logLevel: info
  mode: collector
  positionDB: {}
  replicas: 3
  resources:
    limits:
      cpu: 1000m
      memory: 2Gi
    requests:
      cpu: 500m
      memory: 1Gi
  service: {}
  livenessProbe:
    exec:
      command: 
      - /bin/sh
      - -c
      - "[ $(du -sb /buffers/opensearch-buffer | cut -f1) -gt 94371840 ] && exit 1 || true"
  volumeMounts:
  - name: cm-opensearch-client-certs
    mountPath: /etc/opensearch-tls
  volumes:
  - name: cm-opensearch-client-certs
    secret:
      defaultMode: 420
      secretName: cm-opensearch-client-certs
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFilter
metadata:
  name: 001-istio-proxy-0-parsing
  namespace: fluent
  labels:
    fluentd.default.filter: "true"
spec:
  filters:
    - parser:
        keyName: log
        parse:
          type: json
        reserveData: true
        removeKeyNameField: false
        hashValueField: "istio-access-log"
        emitInvalidRecordToError: false
      tag: "istio-proxy.**"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFilter
metadata:
  name: 001-istio-proxy-1-transforming
  namespace: fluent
  labels:
    fluentd.default.filter: "true"
spec:
  filters:
    - recordTransformer:
        enableRuby: true
        records:
          - key: log
            value: "${record['kubernetes'] && record['kubernetes']['container_name'] == 'istio-proxy' && record['istio-access-log'] && !record['istio-access-log'].empty? ? nil : record['log']}"
      tag: "istio-proxy.**"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFilter
metadata:
  name: zzz-namespace-index
  namespace: fluent
  labels:
    fluentd.default.filter: "true"
spec:
  filters:
    - recordTransformer:
        enableRuby: true
        records:
          - key: logstash_prefix
            value: "${record['namespaceindex'] or 'adm-unknown'}"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
  labels:
    fluentd.default.output: "true"
  name: fluentd-output-opensearch
  namespace: fluent
spec:
  outputs:
  - customPlugin:
      config: |
        <match **>
          @type opensearch
          enable_ruby true

          host "#{ENV['OPENSEARCH_HOST']}"
          port 9200
          scheme http
          user "#{ENV['OPENSEARCH_USERNAME']}"
          password "#{ENV['OPENSEARCH_PASSWORD']}"

          logstash_format true
          logstash_prefix ${$.logstash_prefix}
          
          #connection settings
          request_timeout 15s
          reload_connections true
          reload_on_failure true
          resurrect_after 5s
          log_os_400_reason true

          <buffer tag, $.logstash_prefix>
            @type file
            path /buffers/opensearch-buffer
            flush_thread_count 16
            flush_interval 1s
            chunk_limit_size 90M
            overflow_action block
            queue_limit_length 16
            flush_mode interval
            retry_max_interval 30
            retry_forever true
          </buffer>
        </match>

It results to this config:


<source>
  @type  forward
  bind  0.0.0.0
  port  24224
</source>
<match **>
  @id  main
  @type  label_router
  <route>
    @label  @db681e4cb763ca5b7cdbf9ab76f67bbe
    <match>
    </match>
  </route>
</match>
<label @db681e4cb763ca5b7cdbf9ab76f67bbe>
  <filter istio-proxy.**>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::001-istio-proxy-0-parsing-0
    @type  parser
    emit_invalid_record_to_error  false
    hash_value_field  istio-access-log
    key_name  log
    remove_key_name_field  false
    reserve_data  true
    <parse>
      @type  json
    </parse>
  </filter>
  <filter istio-proxy.**>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::001-istio-proxy-1-transforming-0
    @type  record_transformer
    enable_ruby  true
    <record>
      log  ${record['kubernetes'] && record['kubernetes']['container_name'] == 'istio-proxy' && record['istio-access-log'] && !record['istio-access-log'].empty? ? nil : record['log']}
    </record>
  </filter>
  <filter **>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::zzz-namespace-index-0
    @type  record_transformer
    enable_ruby  true
    <record>
      logstash_prefix  ${record['namespaceindex'] or 'adm-unknown'}
    </record>
  </filter>
  <match **>
    @type opensearch
    enable_ruby true
    host "#{ENV['OPENSEARCH_HOST']}"
    port 9200
    scheme http
    user "#{ENV['OPENSEARCH_USERNAME']}"
    password "#{ENV['OPENSEARCH_PASSWORD']}"
    logstash_format true
    logstash_prefix ${$.logstash_prefix}
    
    #connection settings
    request_timeout 15s
    reload_connections true
    reload_on_failure true
    resurrect_after 5s
    log_os_400_reason true
    <buffer tag, $.logstash_prefix>
      @type file
      path /buffers/opensearch-buffer
      flush_thread_count 16
      flush_interval 1s
      chunk_limit_size 90M
      overflow_action block
      queue_limit_length 16
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>
</label>

So far, so good. Now i am struggling with the Developer Part. The developer should able to get his own filter running, before our clusterfilter adds the "zzz-namespace-clusterfilter". This are my test manifests for the developer part in his namespace:

apiVersion: fluentd.fluent.io/v1alpha1
kind: FluentdConfig
metadata:
  name: fluentd-clusterconfig-referrer
  namespace: development
  labels:
    fluentd.default.config: "true"
spec:
  clusterOutputSelector:
    matchLabels:
      fluentd.default.output: "true"
  filterSelector:
    matchLabels:
      my-own-namespaced-filter: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Filter
metadata:
  labels:
    my-own-namespaced-filter: "true"
  name: development-fulllog-parsing
  namespace: development
spec:
  filters:
  - grep:
      regexp:
        - key: "log"
          pattern: '\[fulllog\]'
  - parser:
      keyName: log
      reserveData: true
      removeKeyNameField: false
      injectKeyPrefix: "processed."
      emitInvalidRecordToError: false
      parse:
        type: regexp
        expression: '^(?<date>\S+)\s+(?<time>\S+)\s+(?<pid>\S+)\s+(?<logger>\S+)\s+\[(?<level>\S+)\]:\s+\[fulllog\]\s+(?<logmessage>.*)$'
  - parser:
      keyName: processed.logmessage
      parse:
        type: json
      reserveData: true
      removeKeyNameField: false
      injectKeyPrefix: "fulllog."
      emitInvalidRecordToError: false

As soon as I deploy the manifesto, fluentd stops working. As an error, i get

2025-02-03 20:58:42 +0000 [error]: config error file="/fluentd/etc/fluent.conf" error_class=Fluent::ConfigError error="Other 'opensearch' plugin already use same buffer path: type = opensearch, buffer path = /buffers/opensearch-buffer"
level=error msg="Fluentd exited" error="exit status 1"

The created config looks like this:

<source>
  @type  forward
  bind  0.0.0.0
  port  24224
</source>
  <source>
    @type prometheus
    @id in_prometheus
    bind "0.0.0.0"
    port 2021
    metrics_path "/metrics"
  </source>
  <source>
    @type prometheus_output_monitor
    interval 10
    <labels>
      hostname ${hostname}
    </labels>
  </source>
<match **>
  @id  main
  @type  label_router
  <route>
    @label  @b7d2982af63a444d6468354108b8c5f1
    <match>
      namespaces  development
    </match>
  </route>
  <route>
    @label  @db681e4cb763ca5b7cdbf9ab76f67bbe
    <match>
    </match>
  </route>
</match>
<label @b7d2982af63a444d6468354108b8c5f1>
  <filter **>
    @id  FluentdConfig-development-fluentd-clusterconfig-referrer::development::filter::development-fulllog-parsing-0
    @type  grep
    <regexp>
      key  log
      pattern  \[fulllog\]
    </regexp>
  </filter>
  <filter **>
    @id  FluentdConfig-development-fluentd-clusterconfig-referrer::development::filter::development-fulllog-parsing-1
    @type  parser
    emit_invalid_record_to_error  false
    inject_key_prefix  processed.
    key_name  log
    remove_key_name_field  false
    reserve_data  true
    <parse>
      @type  regexp
      expression  ^(?<date>\S+)\s+(?<time>\S+)\s+(?<pid>\S+)\s+(?<logger>\S+)\s+\[(?<level>\S+)\]:\s+\[fulllog\]\s+(?<logmessage>.*)$
    </parse>
  </filter>
  <filter **>
    @id  FluentdConfig-development-fluentd-clusterconfig-referrer::development::filter::development-fulllog-parsing-2
    @type  parser
    emit_invalid_record_to_error  false
    inject_key_prefix  fulllog.
    key_name  processed.logmessage
    remove_key_name_field  false
    reserve_data  true
    <parse>
      @type  json
    </parse>
  </filter>
  <match **>
    @type opensearch
    enable_ruby true
    host "#{ENV['OPENSEARCH_HOST']}"
    port 9200
    scheme http
    user "#{ENV['OPENSEARCH_USERNAME']}"
    password "#{ENV['OPENSEARCH_PASSWORD']}"
    logstash_format true
    logstash_prefix ${$.logstash_prefix}
    
    #connection settings
    request_timeout 15s
    reload_connections true
    reload_on_failure true
    resurrect_after 5s
    log_os_400_reason true
    <buffer tag, $.logstash_prefix>
      @type file
      path /buffers/opensearch-buffer
      flush_thread_count 16
      flush_interval 1s
      chunk_limit_size 90M
      overflow_action block
      queue_limit_length 16
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>
</label>
<label @db681e4cb763ca5b7cdbf9ab76f67bbe>
  <filter istio-proxy.**>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::001-istio-proxy-0-parsing-0
    @type  parser
    emit_invalid_record_to_error  false
    hash_value_field  istio-access-log
    key_name  log
    remove_key_name_field  false
    reserve_data  true
    <parse>
      @type  json
    </parse>
  </filter>
  <filter istio-proxy.**>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::001-istio-proxy-1-transforming-0
    @type  record_transformer
    enable_ruby  true
    <record>
      log  ${record['kubernetes'] && record['kubernetes']['container_name'] == 'istio-proxy' && record['istio-access-log'] && !record['istio-access-log'].empty? ? nil : record['log']}
    </record>
  </filter>
  <filter **>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::zzz-namespace-index-0
    @type  record_transformer
    enable_ruby  true
    <record>
      logstash_prefix  ${record['namespaceindex'] or 'adm-unknown'}
    </record>
  </filter>
  <match **>
    @type opensearch
    enable_ruby true
    host "#{ENV['OPENSEARCH_HOST']}"
    port 9200
    scheme http
    user "#{ENV['OPENSEARCH_USERNAME']}"
    password "#{ENV['OPENSEARCH_PASSWORD']}"
    logstash_format true
    logstash_prefix ${$.logstash_prefix}
    
    #connection settings
    request_timeout 15s
    reload_connections true
    reload_on_failure true
    resurrect_after 5s
    log_os_400_reason true
    <buffer tag, $.logstash_prefix>
      @type file
      path /buffers/opensearch-buffer
      flush_thread_count 16
      flush_interval 1s
      chunk_limit_size 90M
      overflow_action block
      queue_limit_length 16
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>
</label>

Contrary to what I thought, it does not use the route that I had created as cloudop at the end, instead it creates a copy in its own namespace. This leads to the buffer collision.

The Question are

  • is my approach correct? If not, whats the best practice here?
    
  • How does i manage the file buffer per namespace?
    
  • How can i setup ONE global clusteroutput for logs?
    

How did you install fluent operator?

No response

Additional context

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant