From 22be1707a1c1f47f2285db55a28d49bbf439b415 Mon Sep 17 00:00:00 2001 From: Sirish Bathina Date: Tue, 24 Sep 2024 08:19:41 -1000 Subject: [PATCH] Add method to set fluentbit hook (#3140) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pkg/log/log.go | 23 ++++++++++++++++++++++ pkg/log/log_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/pkg/log/log.go b/pkg/log/log.go index d40f22f4b4..fcd1932cb0 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "io" + "net/url" "os" "strings" "time" @@ -32,6 +33,12 @@ const ( LevelEnvName = "LOG_LEVEL" ) +var ( + ErrEndpointNotSet = errors.New("fluentbit endpoint not set") + ErrNonTCPEndpoint = errors.New("fluentbit endpoint scheme must be tcp") + ErrPathSet = errors.New("fluentbit endpoint path is set") +) + // OutputSink describes the current output sink. type OutputSink uint8 @@ -79,6 +86,22 @@ func SetOutput(sink OutputSink) error { } } +// SetFluentbitOutput sets the fluentbit output +func SetFluentbitOutput(url *url.URL) error { + if url == nil || url.Host == "" { + return ErrEndpointNotSet + } + if url.Scheme != "tcp" { + return ErrNonTCPEndpoint + } + if url.Path != "" { + return ErrPathSet + } + hook := NewFluentbitHook(url.Host) + log.AddHook(hook) + return nil +} + var envVarFields field.Fields // initEnvVarFields populates envVarFields with values from the host's environment. diff --git a/pkg/log/log_test.go b/pkg/log/log_test.go index b80b2bc0d7..838942d7eb 100644 --- a/pkg/log/log_test.go +++ b/pkg/log/log_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "net/url" "os" "sync" "testing" @@ -226,6 +227,52 @@ func (s *LogSuite) TestCloneGlobalLogger(c *check.C) { c.Assert(hook.capturedMessages[0].Message, check.Equals, "Test message") } +func (s *LogSuite) TestSetFluentbitOutput(c *check.C) { + for _, tc := range []struct { + desc string + url *url.URL + err error + }{ + { + desc: "valid_url", + url: &url.URL{ + Scheme: "tcp", + Host: "something", + }, + }, + { + desc: "path_is_set", + url: &url.URL{ + Scheme: "tcp", + Host: "something", + Path: "something", + }, + err: ErrPathSet, + }, + { + desc: "non_tcp_endpoint", + url: &url.URL{ + Scheme: "http", + Host: "something", + Path: "something", + }, + err: ErrNonTCPEndpoint, + }, + { + desc: "empty_endpoint", + url: &url.URL{}, + err: ErrEndpointNotSet, + }, + { + desc: "nil_endpoint", + err: ErrEndpointNotSet, + }, + } { + err := SetFluentbitOutput(tc.url) + c.Assert(err, check.Equals, tc.err) + } +} + type logHook struct { capturedMessages []*logrus.Entry }