Skip to content

Commit

Permalink
feat: Add copy output plugin for fluentd #1017
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony TREUILLIER <[email protected]>
  • Loading branch information
antrema committed Dec 13, 2023
1 parent f2505f6 commit 52a721b
Show file tree
Hide file tree
Showing 20 changed files with 662 additions and 1 deletion.
42 changes: 42 additions & 0 deletions apis/fluentd/v1alpha1/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,48 @@ func (r *CfgResources) filterForOutputs(
return nil
}

// IdentifyCopyAndPatchOutput patches up the controller with the Manager
func (pgr *PluginResources) IdentifyCopyAndPatchOutput(cfgResources *CfgResources) error {
// patched structure for OutputPlugins
patchedOutputPlugins := []params.PluginStore{}
// copyOutputs stores the id if the output is a `copy`
copyOutputs := map[string]int{}
// outputs stores the id if the output is not a `copy`
outputs := map[string][]int{}

// Iterate over cfgResources.OutputPlugins to identify Copy output
for id, ps := range cfgResources.OutputPlugins {
if ps.Store["@type"] == string(params.CopyOutputType) {
// We store last output when 2 output with the same tag
copyOutputs[ps.Store["tag"]] = id
} else {
outputs[ps.Store["tag"]] = append(outputs[ps.Store["tag"]], id)
}
}

// Patch the outputs
for k, output := range outputs {
// Does it exist a copy output for this tag ?
if c, ok := copyOutputs[k]; ok {
// Yes, so we patch
for _, id := range output {
o := cfgResources.OutputPlugins[id]
o.Name = "store"
cfgResources.OutputPlugins[c].InsertChilds(&o)
}
patchedOutputPlugins = append(patchedOutputPlugins, cfgResources.OutputPlugins[c])
} else {
// No, we don't patch
for _, id := range output {
o := cfgResources.OutputPlugins[id]
patchedOutputPlugins = append(patchedOutputPlugins, o)
}
}
}
cfgResources.OutputPlugins = patchedOutputPlugins
return nil
}

// convert the cfg plugins to a label plugin, appends to the global label plugins
func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResources) error {
if len(r.InputPlugins) == 0 && len(r.FilterPlugins) == 0 && len(r.OutputPlugins) == 0 {
Expand Down
8 changes: 8 additions & 0 deletions apis/fluentd/v1alpha1/plugins/output/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package output

// Copy defines the parameters for out_Copy plugin
type Copy struct {
// CopyMode defines how to pass the events to <store> plugins.
// +kubebuilder:validation:Enum:=no_copy;shallow;deep;marshal
CopyMode *string `json:"copyMode"`
}
15 changes: 15 additions & 0 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Output struct {
CloudWatch *CloudWatch `json:"cloudWatch,omitempty"`
// datadog plugin
Datadog *Datadog `json:"datadog,omitempty"`
// copy plugin
Copy *Copy `json:"copy,omitempty"`
}

// DeepCopyInto implements the DeepCopyInto interface.
Expand Down Expand Up @@ -163,6 +165,12 @@ func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error
ps.InsertType(string(params.DatadogOutputType))
return o.datadogPlugin(ps, loader), nil
}

if o.Copy != nil {
ps.InsertType(string(params.CopyOutputType))
return o.copyPlugin(ps, loader), nil
}

return o.customOutput(ps, loader), nil

}
Expand Down Expand Up @@ -907,4 +915,11 @@ func (o *Output) datadogPlugin(parent *params.PluginStore, sl plugins.SecretLoad
return parent
}

func (o *Output) copyPlugin(parent *params.PluginStore, sl plugins.SecretLoader) *params.PluginStore {
if o.Copy.CopyMode != nil {
parent.InsertPairs("copy_mode", fmt.Sprint(*o.Copy.CopyMode))
}
return parent
}

var _ plugins.Plugin = &Output{}
1 change: 1 addition & 0 deletions apis/fluentd/v1alpha1/plugins/params/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
LokiOutputType OutputType = "loki"
CloudWatchOutputType OutputType = "cloudwatch_logs"
DatadogOutputType OutputType = "datadog"
CopyOutputType OutputType = "copy"
)

var (
Expand Down
5 changes: 4 additions & 1 deletion apis/fluentd/v1alpha1/plugins/params/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ func (ps *PluginStore) InsertChilds(childs ...*PluginStore) {
// The total hash string for this plugin store
func (ps *PluginStore) Hash() string {
c := NewPluginStore(ps.Name)
isNotCopyOutput := ps.Store["@type"] != "copy"

// We must consider the tag when the output is a Copy one
// as copy is a "flag" output: it can exist identical outputs with different tag
for k, v := range ps.Store {
if k == "@id" || k == "tag" {
if k == "@id" || (k == "tag" && isNotCopyOutput) {
continue
}
c.Store[k] = v
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<filter **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::fluentd-filter-0
@type record_transformer
enable_ruby true
<record>
kubernetes_ns ${record["kubernetes"]["namespace_name"]}
</record>
</filter>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-copy-stdout-and-loki-0
@type copy
copy_mode no_copy
<store>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-copy-stdout-and-loki-1
@type stdout
</store>
<store>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-copy-stdout-and-loki-2
@type loki
drop_single_key true
extra_labels {"key11":"value11","key12":"value12"}
extract_kubernetes_labels true
include_thread_label true
insecure_tls true
remove_keys key31,key32
url http://loki-logging-data.kubesphere-logging-system.svc:3100
<label>
key21 key21
key22 key22
</label>
</store>
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match **>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-1-0
@type copy
copy_mode no_copy
<store>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-1-1
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-1
port 9243
scheme https
</store>
<store>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-1-2
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-2
port 9243
scheme https
</store>
<store>
@id FluentdConfig-fluent-fluentd-config::cluster::clusteroutput::fluentd-output-loki-0
@type loki
drop_single_key true
extra_labels {"key11":"value11","key12":"value12"}
extract_kubernetes_labels true
include_thread_label true
insecure_tls true
remove_keys key31,key32
url http://loki-logging-data.kubesphere-logging-system.svc:3100
<label>
key21 key21
key22 key22
</label>
</store>
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match mixed2>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-2-0
@type copy
copy_mode no_copy
<store>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-2-1
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-3
port 9243
scheme https
</store>
<store>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-2-2
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-4
port 9243
scheme https
</store>
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match mixed3>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-3-0
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-5
port 9243
scheme https
</match>
<match mixed3>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-3-1
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-6
port 9243
scheme https
</match>
</label>
Loading

0 comments on commit 52a721b

Please sign in to comment.