Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/observability/v1/output_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,12 @@ type Elasticsearch struct {
// +kubebuilder:validation:Required
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="ElasticSearch Version",xDescriptors={"urn:alm:descriptor:com.tectonic.ui:number"}
Version int `json:"version"`

// Headers specify optional headers to be sent with the request
//
// +kubebuilder:validation:Optional
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Headers"
Headers map[string]string `json:"headers,omitempty"`
}

// GoogleCloudLoggingAuthentication contains configuration for authenticating requests to a GoogleCloudLogging output.
Expand Down Expand Up @@ -673,6 +679,12 @@ type HTTP struct {
// +kubebuilder:validation:Optional
// +kubebuilder:validation:XValidation:rule="self == '' || isURL(self)", message="invalid URL"
ProxyURL string `json:"proxyURL,omitempty"`

// LinePerEvent uses NDJSON instead of JSON to send data to remote destination.
//
// +kubebuilder:validation:Optional
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Line Per Event"
LinePerEvent bool `json:"line_per_event,omitempty"`
}

type KafkaTuningSpec struct {
Expand Down
7 changes: 7 additions & 0 deletions api/observability/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,12 @@ spec:
- secretName
type: object
type: object
headers:
additionalProperties:
type: string
description: Headers specify optional headers to be sent
with the request
type: object
index:
description: |-
Index is the index for the logs. This supports template syntax to allow dynamic per-event values.
Expand Down Expand Up @@ -2459,6 +2465,10 @@ spec:
description: Headers specify optional headers to be sent
with the request
type: object
line_per_event:
description: LinePerEvent uses NDJSON instead of JSON to
send data to remote destination.
type: boolean
method:
description: Method specifies the HTTP method to be used
for sending logs. If not set, 'POST' is used.
Expand Down
8 changes: 8 additions & 0 deletions docs/reference/operator/api_observability_v1.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,8 @@ The 'username@password' part of `url` is ignored.

|authentication|object| Authentication sets credentials for authenticating the requests.

|headers|object| Headers specify optional headers to be sent with the request

|index|string| Index is the index for the logs. This supports template syntax to allow dynamic per-event values.

The Index can be a combination of static and dynamic values consisting of field paths followed by `||` followed by another field path or a static value.
Expand Down Expand Up @@ -2160,6 +2162,10 @@ Type:: object

|======================

=== .spec.outputs[].elasticsearch.headers

Type:: object

=== .spec.outputs[].elasticsearch.tuning

Type:: object
Expand Down Expand Up @@ -2291,6 +2297,8 @@ The 'username@password' part of `url` is ignored.

|headers|object| Headers specify optional headers to be sent with the request

|line_per_event|bool| LinePerEvent uses NDJSON instead of JSON to send data to remote destination.

|method|string| Method specifies the HTTP method to be used for sending logs. If not set, 'POST' is used.

|proxyURL|string| ProxyURL URL of a HTTP or HTTPS proxy to be used instead of direct connection.
Expand Down
10 changes: 9 additions & 1 deletion internal/generator/vector/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ if exists(.kubernetes.event.metadata.uid) {
common.NewAcknowledgments(id, strategy),
common.NewBatch(id, strategy),
common.NewBuffer(id, strategy),
common.NewRequest(id, strategy),
Request(id, o.Elasticsearch, strategy),
tls.New(id, o.TLS, secrets, op, Option{Name: URL, Value: o.Elasticsearch.URL}),
)

Expand Down Expand Up @@ -110,3 +110,11 @@ func Output(id string, o obs.OutputSpec, inputs []string, index string, secrets
}
return &es
}

func Request(id string, o *obs.Elasticsearch, strategy common.ConfigStrategy) *common.Request {
req := common.NewRequest(id, strategy)
if len(o.Headers) != 0 {
req.SetHeaders(o.Headers)
}
return req
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,10 @@ var _ = Describe("Generate Vector config", func() {
BaseOutputTuningSpec: *baseTune,
}
}, true, framework.NoOptions, "es_with_tune.toml"),
Entry("with headers", func(spec *obs.OutputSpec) {
spec.Elasticsearch.Headers = map[string]string{
"Key": "Value",
}
}, true, framework.NoOptions, "es_with_headers.toml"),
)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Elasticsearch Index
[transforms.es_1_index]
type = "remap"
inputs = ["application"]
source = '''
._internal.es_1_index = to_string!(._internal.log_type||"none")
'''

[sinks.es_1]
type = "elasticsearch"
inputs = ["es_1_index"]
endpoints = ["https://es.svc.infra.cluster:9200"]
bulk.index = "{{ _internal.es_1_index }}"
bulk.action = "create"
api_version = "v8"

[sinks.es_1.encoding]
except_fields = ["_internal"]

[sinks.es_1.request]
headers = {"Key"="Value"}

[sinks.es_1.auth]
strategy = "basic"
user = "SECRET[kubernetes_secret.es-1/username]"
password = "SECRET[kubernetes_secret.es-1/password]"
27 changes: 16 additions & 11 deletions internal/generator/vector/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type Http struct {
ComponentID string
Inputs string
URI string
Method string
Proxy string
ComponentID string
Inputs string
URI string
Method string
Proxy string
LinePerEvent bool
common.RootMixin
}

Expand All @@ -33,6 +34,9 @@ type = "http"
inputs = {{.Inputs}}
uri = "{{.URI}}"
method = "{{.Method}}"
{{with .LinePerEvent}}
framing.method = "newline_delimited"
{{end}}
{{with .Proxy -}}
proxy.enabled = true
proxy.http = "{{.}}"
Expand Down Expand Up @@ -76,12 +80,13 @@ func New(id string, o obs.OutputSpec, inputs []string, secrets observability.Sec

func Output(id string, o obs.OutputSpec, inputs []string, secrets observability.Secrets, op Options) *Http {
return &Http{
ComponentID: id,
Inputs: vectorhelpers.MakeInputs(inputs...),
URI: o.HTTP.URL,
Method: Method(o.HTTP),
Proxy: o.HTTP.ProxyURL,
RootMixin: common.NewRootMixin(nil),
ComponentID: id,
Inputs: vectorhelpers.MakeInputs(inputs...),
URI: o.HTTP.URL,
Method: Method(o.HTTP),
Proxy: o.HTTP.ProxyURL,
LinePerEvent: o.HTTP.LinePerEvent,
RootMixin: common.NewRootMixin(nil),
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/generator/vector/output/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ var _ = Describe("Generate vector config", func() {
BaseOutputTuningSpec: *baseTune,
}
}, secrets, true, framework.NoOptions, "http_with_tuning.toml"),
Entry("with ndjson", func(spec *obs.OutputSpec) {
spec.HTTP.LinePerEvent = true
}, secrets, true, framework.NoOptions, "http_with_ndjson.toml"),
Entry("with proxy", func(spec *obs.OutputSpec) {
spec.HTTP.ProxyURL = "http://somewhere.org/proxy"
spec.HTTP.Headers = nil
Expand Down
20 changes: 20 additions & 0 deletions internal/generator/vector/output/http/http_with_ndjson.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[sinks.http_receiver]
type = "http"
inputs = ["application"]
uri = "https://my-logstore.com"
method = "post"
framing.method = "newline_delimited"

[sinks.http_receiver.encoding]
codec = "json"
except_fields = ["_internal"]

[sinks.http_receiver.request]
headers = {"h1"="v1","h2"="v2"}

[sinks.http_receiver.auth]
strategy = "basic"
user = "SECRET[kubernetes_secret.http-receiver/username]"
password = "SECRET[kubernetes_secret.http-receiver/password]"


2 changes: 2 additions & 0 deletions internal/validations/observability/outputs/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func Validate(context internalcontext.ForwarderContext) {
messages = append(messages, validateHttpContentTypeHeaders(out)...)
case obs.OutputTypeLokiStack, obs.OutputTypeOTLP:
messages = append(messages, ValidateTechPreviewAnnotation(out, context)...)
case obs.OutputTypeElasticsearch:
messages = append(messages, validateElasticsearchHeaders(out)...)
}
// Set condition
if len(messages) > 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package outputs

import (
"fmt"
log "github.com/ViaQ/logerr/v2/log/static"
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
"strings"
)

// validateElasticsearchHeaders will validate Elasticsearch custom headers
// it's not allowed to pass "Authorization" and "Content-Type" headers
func validateElasticsearchHeaders(output obs.OutputSpec) (results []string) {
if output.Type == obs.OutputTypeElasticsearch && output.Elasticsearch != nil && len(output.Elasticsearch.Headers) > 0 {
var invalidHeaders []string
for headerName := range output.Elasticsearch.Headers {
if strings.ToLower(headerName) == "authorization" || strings.ToLower(headerName) == "content-type" {
invalidHeaders = append(invalidHeaders, headerName)
}
}
if len(invalidHeaders) > 0 {
log.V(3).Info("validateElasticsearchHeaders failed", "reason", "invalid headers found: ", strings.Join(invalidHeaders, ","))
results = append(results, fmt.Sprintf("invalid headers found: %s", strings.Join(invalidHeaders, ",")))
}
}
return results
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package outputs

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/openshift/cluster-logging-operator/api/observability/v1"
)

var _ = Describe("[internal][validations] ClusterLogForwarder will validate headers in Elasticsearch Output", func() {
var (
es *v1.Elasticsearch
spec v1.OutputSpec
)
BeforeEach(func() {
es = &v1.Elasticsearch{}
spec = v1.OutputSpec{
Name: "esOutput",
Type: v1.OutputTypeElasticsearch,
Elasticsearch: es,
}
})

Context("#validateElasticsearchHeaders", func() {

It("should pass validation with empty headers", func() {
Expect(validateElasticsearchHeaders(spec)).To(BeEmpty())
})
It("should pass validation when no invalid headers set", func() {
spec.Elasticsearch.Headers = map[string]string{
"Accept": "application/json",
}
Expect(validateElasticsearchHeaders(spec)).To(BeEmpty())
})
It("should fail validation when the Content-Type header is set", func() {
spec.Elasticsearch.Headers = map[string]string{
"Content-Type": "application/json",
}
Expect(validateElasticsearchHeaders(spec)).ToNot(BeEmpty())
})
It("should fail validation when the Authorization header is set", func() {
spec.Elasticsearch.Headers = map[string]string{
"Authorization": "test",
}
Expect(validateElasticsearchHeaders(spec)).ToNot(BeEmpty())
})
It("should pass validation when no Elasticsearch Output", func() {
spec = v1.OutputSpec{
Name: "esOutput",
Type: v1.OutputTypeElasticsearch,
Elasticsearch: &v1.Elasticsearch{},
}
Expect(validateElasticsearchHeaders(spec)).To(BeEmpty())
})
})
})
20 changes: 16 additions & 4 deletions test/framework/functional/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,24 @@ func (f *CollectorFunctionalFramework) addOutputContainers(b *runtime.PodBuilder
return err
}
case obs.OutputTypeElasticsearch:
if err := f.AddESOutput(ElasticsearchVersion(output.Elasticsearch.Version), b, output, nil); err != nil {
return err
if len(output.Elasticsearch.Headers) == 0 {
if err := f.AddESOutput(ElasticsearchVersion(output.Elasticsearch.Version), b, output, nil); err != nil {
return err
}
} else {
if err := f.AddVLOutput(b, output, nil); err != nil {
return err
}
}
case obs.OutputTypeHTTP:
if err := f.AddVectorHttpOutput(b, output); err != nil {
return err
if output.HTTP.LinePerEvent {
if err := f.AddVLOutput(b, output, nil); err != nil {
return err
}
} else {
if err := f.AddVectorHttpOutput(b, output); err != nil {
return err
}
}
case obs.OutputTypeSplunk:
if err := f.AddSplunkOutput(b, output); err != nil {
Expand Down
Loading