diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 72c81ebe23d..1e4c3410e97 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,6 +21,8 @@ jobs: steps: - name: Checkout Repo uses: actions/checkout@v4 + with: + fetch-depth: 0 ## Needed for "Set internal/tools/go.mod timestamp" step. - name: Install Go uses: actions/setup-go@v5 with: @@ -34,6 +36,14 @@ jobs: with: path: .tools key: ${{ runner.os }}-${{ env.cache-name }}-${{ hashFiles('./internal/tools/**') }} + # The step below is needed to not rebuild all the build tools. + - name: Set internal/tools/go.mod timestamp + run: | + filename="internal/tools/go.mod" + unixtime=$(git log -1 --format="%at" -- "${filename}") + touchtime=$(date -d @$unixtime +'%Y%m%d%H%M.%S') + touch -t ${touchtime} "${filename}" + ls -la --time-style=full-iso "${filename}" - name: Generate run: make generate - name: Run linters diff --git a/CHANGELOG.md b/CHANGELOG.md index 97920080592..ea5dda271b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - Update `go.opentelemetry.io/proto/otlp` from v1.1.0 to v1.2.0. (#5177) +- Improve performance of baggage member character validation in `go.opentelemetry.io/otel/baggage`. (#5214) - `Shutdown` method of `Exporter` in `go.opentelemetry.io/otel/exporters/stdout/stdouttrace` ignores the context cancellation and always returns `nil`. (#5189) - `ForceFlush` and `Shutdown` methods of the exporter returned by `New` in `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric` ignore the context cancellation and always return `nil`. (#5189) diff --git a/Makefile b/Makefile index 8b3d8816ea0..ca2f0ad037c 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ TOOLS = $(CURDIR)/.tools $(TOOLS): @mkdir -p $@ -$(TOOLS)/%: | $(TOOLS) +$(TOOLS)/%: $(TOOLS_MOD_DIR)/go.mod | $(TOOLS) cd $(TOOLS_MOD_DIR) && \ $(GO) build -o $@ $(PACKAGE) @@ -99,7 +99,7 @@ $(PYTOOLS): @$(DOCKERPY) bash -c "python3 -m venv $(VENVDIR) && $(PIP) install --upgrade pip" # Install python packages into the virtual environment. -$(PYTOOLS)/%: | $(PYTOOLS) +$(PYTOOLS)/%: $(PYTOOLS) @$(DOCKERPY) $(PIP) install -r requirements.txt CODESPELL = $(PYTOOLS)/codespell @@ -113,18 +113,18 @@ generate: go-generate vanity-import-fix .PHONY: go-generate go-generate: $(OTEL_GO_MOD_DIRS:%=go-generate/%) go-generate/%: DIR=$* -go-generate/%: | $(STRINGER) $(GOTMPL) +go-generate/%: $(STRINGER) $(GOTMPL) @echo "$(GO) generate $(DIR)/..." \ && cd $(DIR) \ && PATH="$(TOOLS):$${PATH}" $(GO) generate ./... .PHONY: vanity-import-fix -vanity-import-fix: | $(PORTO) +vanity-import-fix: $(PORTO) @$(PORTO) --include-internal -w . # Generate go.work file for local development. .PHONY: go-work -go-work: | $(CROSSLINK) +go-work: $(CROSSLINK) $(CROSSLINK) work --root=$(shell pwd) # Build @@ -167,7 +167,7 @@ test/%: COVERAGE_MODE = atomic COVERAGE_PROFILE = coverage.out .PHONY: test-coverage -test-coverage: | $(GOCOVMERGE) +test-coverage: $(GOCOVMERGE) @set -e; \ printf "" > coverage.txt; \ for dir in $(ALL_COVERAGE_MOD_DIRS); do \ @@ -198,20 +198,20 @@ golangci-lint-fix: ARGS=--fix golangci-lint-fix: golangci-lint golangci-lint: $(OTEL_GO_MOD_DIRS:%=golangci-lint/%) golangci-lint/%: DIR=$* -golangci-lint/%: | $(GOLANGCI_LINT) +golangci-lint/%: $(GOLANGCI_LINT) @echo 'golangci-lint $(if $(ARGS),$(ARGS) ,)$(DIR)' \ && cd $(DIR) \ && $(GOLANGCI_LINT) run --allow-serial-runners $(ARGS) .PHONY: crosslink -crosslink: | $(CROSSLINK) +crosslink: $(CROSSLINK) @echo "Updating intra-repository dependencies in all go modules" \ && $(CROSSLINK) --root=$(shell pwd) --prune .PHONY: go-mod-tidy go-mod-tidy: $(ALL_GO_MOD_DIRS:%=go-mod-tidy/%) go-mod-tidy/%: DIR=$* -go-mod-tidy/%: | crosslink +go-mod-tidy/%: crosslink @echo "$(GO) mod tidy in $(DIR)" \ && cd $(DIR) \ && $(GO) mod tidy -compat=1.21 @@ -223,23 +223,23 @@ lint-modules: go-mod-tidy lint: misspell lint-modules golangci-lint govulncheck .PHONY: vanity-import-check -vanity-import-check: | $(PORTO) +vanity-import-check: $(PORTO) @$(PORTO) --include-internal -l . || ( echo "(run: make vanity-import-fix)"; exit 1 ) .PHONY: misspell -misspell: | $(MISSPELL) +misspell: $(MISSPELL) @$(MISSPELL) -w $(ALL_DOCS) .PHONY: govulncheck govulncheck: $(OTEL_GO_MOD_DIRS:%=govulncheck/%) govulncheck/%: DIR=$* -govulncheck/%: | $(GOVULNCHECK) +govulncheck/%: $(GOVULNCHECK) @echo "govulncheck ./... in $(DIR)" \ && cd $(DIR) \ && $(GOVULNCHECK) ./... .PHONY: codespell -codespell: | $(CODESPELL) +codespell: $(CODESPELL) @$(DOCKERPY) $(CODESPELL) .PHONY: license-check @@ -254,11 +254,11 @@ license-check: DEPENDABOT_CONFIG = .github/dependabot.yml .PHONY: dependabot-check -dependabot-check: | $(DBOTCONF) +dependabot-check: $(DBOTCONF) @$(DBOTCONF) verify $(DEPENDABOT_CONFIG) || ( echo "(run: make dependabot-generate)"; exit 1 ) .PHONY: dependabot-generate -dependabot-generate: | $(DBOTCONF) +dependabot-generate: $(DBOTCONF) @$(DBOTCONF) generate > $(DEPENDABOT_CONFIG) .PHONY: check-clean-work-tree @@ -273,7 +273,7 @@ check-clean-work-tree: SEMCONVPKG ?= "semconv/" .PHONY: semconv-generate -semconv-generate: | $(SEMCONVGEN) $(SEMCONVKIT) +semconv-generate: $(SEMCONVGEN) $(SEMCONVKIT) [ "$(TAG)" ] || ( echo "TAG unset: missing opentelemetry semantic-conventions tag"; exit 1 ) [ "$(OTEL_SEMCONV_REPO)" ] || ( echo "OTEL_SEMCONV_REPO unset: missing path to opentelemetry semantic-conventions repo"; exit 1 ) $(SEMCONVGEN) -i "$(OTEL_SEMCONV_REPO)/model/." --only=span -p conventionType=trace -f trace.go -t "$(SEMCONVPKG)/template.j2" -s "$(TAG)" @@ -293,13 +293,13 @@ gorelease/%:| $(GORELEASE) || echo "" .PHONY: prerelease -prerelease: | $(MULTIMOD) +prerelease: $(MULTIMOD) @[ "${MODSET}" ] || ( echo ">> env var MODSET is not set"; exit 1 ) $(MULTIMOD) verify && $(MULTIMOD) prerelease -m ${MODSET} COMMIT ?= "HEAD" .PHONY: add-tags -add-tags: | $(MULTIMOD) +add-tags: $(MULTIMOD) @[ "${MODSET}" ] || ( echo ">> env var MODSET is not set"; exit 1 ) $(MULTIMOD) verify && $(MULTIMOD) tag -m ${MODSET} -c ${COMMIT} diff --git a/baggage/baggage.go b/baggage/baggage.go index 94285d95935..75773bc1ce9 100644 --- a/baggage/baggage.go +++ b/baggage/baggage.go @@ -8,6 +8,7 @@ import ( "fmt" "net/url" "strings" + "unicode/utf8" "go.opentelemetry.io/otel/internal/baggage" ) @@ -221,7 +222,7 @@ type Member struct { hasData bool } -// NewMemberRaw returns a new Member from the passed arguments. +// NewMember returns a new Member from the passed arguments. // // The passed key must be compliant with W3C Baggage specification. // The passed value must be percent-encoded as defined in W3C Baggage specification. @@ -630,6 +631,95 @@ func skipSpace(s string, offset int) int { return i } +var safeKeyCharset = [utf8.RuneSelf]bool{ + // 0x23 to 0x27 + '#': true, + '$': true, + '%': true, + '&': true, + '\'': true, + + // 0x30 to 0x39 + '0': true, + '1': true, + '2': true, + '3': true, + '4': true, + '5': true, + '6': true, + '7': true, + '8': true, + '9': true, + + // 0x41 to 0x5a + 'A': true, + 'B': true, + 'C': true, + 'D': true, + 'E': true, + 'F': true, + 'G': true, + 'H': true, + 'I': true, + 'J': true, + 'K': true, + 'L': true, + 'M': true, + 'N': true, + 'O': true, + 'P': true, + 'Q': true, + 'R': true, + 'S': true, + 'T': true, + 'U': true, + 'V': true, + 'W': true, + 'X': true, + 'Y': true, + 'Z': true, + + // 0x5e to 0x7a + '^': true, + '_': true, + '`': true, + 'a': true, + 'b': true, + 'c': true, + 'd': true, + 'e': true, + 'f': true, + 'g': true, + 'h': true, + 'i': true, + 'j': true, + 'k': true, + 'l': true, + 'm': true, + 'n': true, + 'o': true, + 'p': true, + 'q': true, + 'r': true, + 's': true, + 't': true, + 'u': true, + 'v': true, + 'w': true, + 'x': true, + 'y': true, + 'z': true, + + // remainder + '!': true, + '*': true, + '+': true, + '-': true, + '.': true, + '|': true, + '~': true, +} + func validateKey(s string) bool { if len(s) == 0 { return false @@ -645,17 +735,7 @@ func validateKey(s string) bool { } func validateKeyChar(c int32) bool { - return (c >= 0x23 && c <= 0x27) || - (c >= 0x30 && c <= 0x39) || - (c >= 0x41 && c <= 0x5a) || - (c >= 0x5e && c <= 0x7a) || - c == 0x21 || - c == 0x2a || - c == 0x2b || - c == 0x2d || - c == 0x2e || - c == 0x7c || - c == 0x7e + return c >= 0 && c <= int32(utf8.RuneSelf) && safeKeyCharset[c] } func validateValue(s string) bool { @@ -668,12 +748,109 @@ func validateValue(s string) bool { return true } +var safeValueCharset = [utf8.RuneSelf]bool{ + '!': true, // 0x21 + + // 0x23 to 0x2b + '#': true, + '$': true, + '%': true, + '&': true, + '\'': true, + '(': true, + ')': true, + '*': true, + '+': true, + + // 0x2d to 0x3a + '-': true, + '.': true, + '/': true, + '0': true, + '1': true, + '2': true, + '3': true, + '4': true, + '5': true, + '6': true, + '7': true, + '8': true, + '9': true, + ':': true, + + // 0x3c to 0x5b + '<': true, // 0x3C + '=': true, // 0x3D + '>': true, // 0x3E + '?': true, // 0x3F + '@': true, // 0x40 + 'A': true, // 0x41 + 'B': true, // 0x42 + 'C': true, // 0x43 + 'D': true, // 0x44 + 'E': true, // 0x45 + 'F': true, // 0x46 + 'G': true, // 0x47 + 'H': true, // 0x48 + 'I': true, // 0x49 + 'J': true, // 0x4A + 'K': true, // 0x4B + 'L': true, // 0x4C + 'M': true, // 0x4D + 'N': true, // 0x4E + 'O': true, // 0x4F + 'P': true, // 0x50 + 'Q': true, // 0x51 + 'R': true, // 0x52 + 'S': true, // 0x53 + 'T': true, // 0x54 + 'U': true, // 0x55 + 'V': true, // 0x56 + 'W': true, // 0x57 + 'X': true, // 0x58 + 'Y': true, // 0x59 + 'Z': true, // 0x5A + '[': true, // 0x5B + + // 0x5d to 0x7e + ']': true, // 0x5D + '^': true, // 0x5E + '_': true, // 0x5F + '`': true, // 0x60 + 'a': true, // 0x61 + 'b': true, // 0x62 + 'c': true, // 0x63 + 'd': true, // 0x64 + 'e': true, // 0x65 + 'f': true, // 0x66 + 'g': true, // 0x67 + 'h': true, // 0x68 + 'i': true, // 0x69 + 'j': true, // 0x6A + 'k': true, // 0x6B + 'l': true, // 0x6C + 'm': true, // 0x6D + 'n': true, // 0x6E + 'o': true, // 0x6F + 'p': true, // 0x70 + 'q': true, // 0x71 + 'r': true, // 0x72 + 's': true, // 0x73 + 't': true, // 0x74 + 'u': true, // 0x75 + 'v': true, // 0x76 + 'w': true, // 0x77 + 'x': true, // 0x78 + 'y': true, // 0x79 + 'z': true, // 0x7A + '{': true, // 0x7B + '|': true, // 0x7C + '}': true, // 0x7D + '~': true, // 0x7E +} + func validateValueChar(c int32) bool { - return c == 0x21 || - (c >= 0x23 && c <= 0x2b) || - (c >= 0x2d && c <= 0x3a) || - (c >= 0x3c && c <= 0x5b) || - (c >= 0x5d && c <= 0x7e) + return c >= 0 && c <= int32(utf8.RuneSelf) && safeValueCharset[c] } // valueEscape escapes the string so it can be safely placed inside a baggage value, diff --git a/baggage/baggage_test.go b/baggage/baggage_test.go index 4ef5b334297..e8f67761f0b 100644 --- a/baggage/baggage_test.go +++ b/baggage/baggage_test.go @@ -31,7 +31,7 @@ func TestValidateKeyChar(t *testing.T) { '\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17', '\x18', '\x19', '\x1A', '\x1B', '\x1C', '\x1D', '\x1E', '\x1F', ' ', '(', ')', '<', '>', '@', ',', ';', ':', '\\', '"', '/', '[', ']', '?', - '=', '{', '}', '\x7F', + '=', '{', '}', '\x7F', 2 >> 20, } for _, ch := range invalidKeyRune { diff --git a/exporters/otlp/otlplog/otlploghttp/client.go b/exporters/otlp/otlplog/otlploghttp/client.go index fc5d911f7b5..1ff61d0d662 100644 --- a/exporters/otlp/otlplog/otlploghttp/client.go +++ b/exporters/otlp/otlplog/otlploghttp/client.go @@ -4,9 +4,26 @@ package otlploghttp // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" import ( + "bytes" + "compress/gzip" "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "sync" + "time" + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/otel" + collogpb "go.opentelemetry.io/proto/slim/otlp/collector/logs/v1" logpb "go.opentelemetry.io/proto/slim/otlp/logs/v1" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry" ) type client struct { @@ -26,6 +43,258 @@ func newNoopClient() *client { // newHTTPClient creates a new HTTP log client. func newHTTPClient(cfg config) (*client, error) { - // TODO: implement. - return &client{}, nil + hc := &http.Client{ + Transport: ourTransport, + Timeout: cfg.timeout.Value, + } + + if cfg.tlsCfg.Value != nil || cfg.proxy.Value != nil { + clonedTransport := ourTransport.Clone() + hc.Transport = clonedTransport + + if cfg.tlsCfg.Value != nil { + clonedTransport.TLSClientConfig = cfg.tlsCfg.Value + } + if cfg.proxy.Value != nil { + clonedTransport.Proxy = cfg.proxy.Value + } + } + + u := &url.URL{ + Scheme: "https", + Host: cfg.endpoint.Value, + Path: cfg.path.Value, + } + if cfg.insecure.Value { + u.Scheme = "http" + } + // Body is set when this is cloned during upload. + req, err := http.NewRequest(http.MethodPost, u.String(), http.NoBody) + if err != nil { + return nil, err + } + + userAgent := "OTel Go OTLP over HTTP/protobuf logs exporter/" + Version() + req.Header.Set("User-Agent", userAgent) + + if n := len(cfg.headers.Value); n > 0 { + for k, v := range cfg.headers.Value { + req.Header.Set(k, v) + } + } + req.Header.Set("Content-Type", "application/x-protobuf") + + c := &httpClient{ + compression: cfg.compression.Value, + req: req, + requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate), + client: hc, + } + return &client{uploadLogs: c.uploadLogs}, nil +} + +type httpClient struct { + // req is cloned for every upload the client makes. + req *http.Request + compression Compression + requestFunc retry.RequestFunc + client *http.Client +} + +// Keep it in sync with golang's DefaultTransport from net/http! We +// have our own copy to avoid handling a situation where the +// DefaultTransport is overwritten with some different implementation +// of http.RoundTripper or it's modified by another package. +var ourTransport = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, +} + +func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs) error { + // The Exporter synchronizes access to client methods. This is not called + // after the Exporter is shutdown. Only thing to do here is send data. + + pbRequest := &collogpb.ExportLogsServiceRequest{ResourceLogs: data} + body, err := proto.Marshal(pbRequest) + if err != nil { + return err + } + request, err := c.newRequest(ctx, body) + if err != nil { + return err + } + + return c.requestFunc(ctx, func(iCtx context.Context) error { + select { + case <-iCtx.Done(): + return iCtx.Err() + default: + } + + request.reset(iCtx) + resp, err := c.client.Do(request.Request) + var urlErr *url.Error + if errors.As(err, &urlErr) && urlErr.Temporary() { + return newResponseError(http.Header{}) + } + if err != nil { + return err + } + + var rErr error + switch sc := resp.StatusCode; { + case sc >= 200 && sc <= 299: + // Success, do not retry. + + // Read the partial success message, if any. + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { + return err + } + if respData.Len() == 0 { + return nil + } + + if resp.Header.Get("Content-Type") == "application/x-protobuf" { + var respProto collogpb.ExportLogsServiceResponse + if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { + return err + } + + if respProto.PartialSuccess != nil { + msg := respProto.PartialSuccess.GetErrorMessage() + n := respProto.PartialSuccess.GetRejectedLogRecords() + if n != 0 || msg != "" { + err := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n) + otel.Handle(err) + } + } + } + return nil + case sc == http.StatusTooManyRequests, + sc == http.StatusBadGateway, + sc == http.StatusServiceUnavailable, + sc == http.StatusGatewayTimeout: + // Retry-able failure. + rErr = newResponseError(resp.Header) + + // Going to retry, drain the body to reuse the connection. + if _, err := io.Copy(io.Discard, resp.Body); err != nil { + _ = resp.Body.Close() + return err + } + default: + rErr = fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status) + } + + if err := resp.Body.Close(); err != nil { + return err + } + return rErr + }) +} + +var gzPool = sync.Pool{ + New: func() interface{} { + w := gzip.NewWriter(io.Discard) + return w + }, +} + +func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, error) { + r := c.req.Clone(ctx) + req := request{Request: r} + + switch c.compression { + case NoCompression: + r.ContentLength = (int64)(len(body)) + req.bodyReader = bodyReader(body) + case GzipCompression: + // Ensure the content length is not used. + r.ContentLength = -1 + r.Header.Set("Content-Encoding", "gzip") + + gz := gzPool.Get().(*gzip.Writer) + defer gzPool.Put(gz) + + var b bytes.Buffer + gz.Reset(&b) + + if _, err := gz.Write(body); err != nil { + return req, err + } + // Close needs to be called to ensure body is fully written. + if err := gz.Close(); err != nil { + return req, err + } + + req.bodyReader = bodyReader(b.Bytes()) + } + + return req, nil +} + +// bodyReader returns a closure returning a new reader for buf. +func bodyReader(buf []byte) func() io.ReadCloser { + return func() io.ReadCloser { + return io.NopCloser(bytes.NewReader(buf)) + } +} + +// request wraps an http.Request with a resettable body reader. +type request struct { + *http.Request + + // bodyReader allows the same body to be used for multiple requests. + bodyReader func() io.ReadCloser +} + +// reset reinitializes the request Body and uses ctx for the request. +func (r *request) reset(ctx context.Context) { + r.Body = r.bodyReader() + r.Request = r.WithContext(ctx) +} + +// retryableError represents a request failure that can be retried. +type retryableError struct { + throttle int64 +} + +// newResponseError returns a retryableError and will extract any explicit +// throttle delay contained in headers. +func newResponseError(header http.Header) error { + var rErr retryableError + if v := header.Get("Retry-After"); v != "" { + if t, err := strconv.ParseInt(v, 10, 64); err == nil { + rErr.throttle = t + } + } + return rErr +} + +func (e retryableError) Error() string { + return "retry-able request failure" +} + +// evaluate returns if err is retry-able. If it is and it includes an explicit +// throttling delay, that delay is also returned. +func evaluate(err error) (bool, time.Duration) { + if err == nil { + return false, 0 + } + + rErr, ok := err.(retryableError) + if !ok { + return false, 0 + } + + return true, time.Duration(rErr.throttle) } diff --git a/exporters/otlp/otlplog/otlploghttp/client_test.go b/exporters/otlp/otlplog/otlploghttp/client_test.go new file mode 100644 index 00000000000..799d4022d58 --- /dev/null +++ b/exporters/otlp/otlplog/otlploghttp/client_test.go @@ -0,0 +1,757 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlploghttp + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "errors" + "fmt" + "io" + "math/big" + "net" + "net/http" + "net/url" + "strings" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/otel" + collogpb "go.opentelemetry.io/proto/slim/otlp/collector/logs/v1" + cpb "go.opentelemetry.io/proto/slim/otlp/common/v1" + lpb "go.opentelemetry.io/proto/slim/otlp/logs/v1" + rpb "go.opentelemetry.io/proto/slim/otlp/resource/v1" + + "go.opentelemetry.io/otel/sdk/log" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +var ( + // Sat Jan 01 2000 00:00:00 GMT+0000. + ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0)) + obs = ts.Add(30 * time.Second) + + kvAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, + }} + kvBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, + }} + kvSrvName = &cpb.KeyValue{Key: "service.name", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, + }} + kvSrvVer = &cpb.KeyValue{Key: "service.version", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, + }} + + pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO + pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR + + pbBodyA = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: "a", + }, + } + pbBodyB = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: "b", + }, + } + + spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1} + spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2} + traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} + flagsA = byte(1) + flagsB = byte(0) + + logRecords = []*lpb.LogRecord{ + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevA, + SeverityText: "A", + Body: pbBodyA, + Attributes: []*cpb.KeyValue{kvAlice}, + Flags: uint32(flagsA), + TraceId: traceIDA, + SpanId: spanIDA, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevA, + SeverityText: "A", + Body: pbBodyA, + Attributes: []*cpb.KeyValue{kvBob}, + Flags: uint32(flagsA), + TraceId: traceIDA, + SpanId: spanIDA, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevB, + SeverityText: "B", + Body: pbBodyB, + Attributes: []*cpb.KeyValue{kvAlice}, + Flags: uint32(flagsB), + TraceId: traceIDB, + SpanId: spanIDB, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevB, + SeverityText: "B", + Body: pbBodyB, + Attributes: []*cpb.KeyValue{kvBob}, + Flags: uint32(flagsB), + TraceId: traceIDB, + SpanId: spanIDB, + }, + } + + scope = &cpb.InstrumentationScope{ + Name: "test/code/path", + Version: "v0.1.0", + } + scopeLogs = []*lpb.ScopeLogs{ + { + Scope: scope, + LogRecords: logRecords, + SchemaUrl: semconv.SchemaURL, + }, + } + + res = &rpb.Resource{ + Attributes: []*cpb.KeyValue{kvSrvName, kvSrvVer}, + } + resourceLogs = []*lpb.ResourceLogs{{ + Resource: res, + ScopeLogs: scopeLogs, + SchemaUrl: semconv.SchemaURL, + }} +) + +type exportResult struct { + Response *collogpb.ExportLogsServiceResponse + Err error +} + +// storage stores uploaded OTLP log data in their proto form. +type storage struct { + dataMu sync.Mutex + data []*lpb.ResourceLogs +} + +// newStorage returns a configure storage ready to store received requests. +func newStorage() *storage { + return &storage{} +} + +// Add adds the request to the Storage. +func (s *storage) Add(request *collogpb.ExportLogsServiceRequest) { + s.dataMu.Lock() + defer s.dataMu.Unlock() + s.data = append(s.data, request.ResourceLogs...) +} + +// Dump returns all added ResourceLogs and clears the storage. +func (s *storage) Dump() []*lpb.ResourceLogs { + s.dataMu.Lock() + defer s.dataMu.Unlock() + + var data []*lpb.ResourceLogs + data, s.data = s.data, []*lpb.ResourceLogs{} + return data +} + +var emptyExportLogsServiceResponse = func() []byte { + body := collogpb.ExportLogsServiceResponse{} + r, err := proto.Marshal(&body) + if err != nil { + panic(err) + } + return r +}() + +type httpResponseError struct { + Err error + Status int + Header http.Header +} + +func (e *httpResponseError) Error() string { + return fmt.Sprintf("%d: %s", e.Status, e.Err) +} + +func (e *httpResponseError) Unwrap() error { return e.Err } + +// httpCollector is an OTLP HTTP server that collects all requests it receives. +type httpCollector struct { + plainTextResponse bool + + headersMu sync.Mutex + headers http.Header + storage *storage + + resultCh <-chan exportResult + listener net.Listener + srv *http.Server +} + +// newHTTPCollector returns a *HTTPCollector that is listening at the provided +// endpoint. +// +// If endpoint is an empty string, the returned collector will be listening on +// the localhost interface at an OS chosen port, not use TLS, and listen at the +// default OTLP log endpoint path ("/v1/logs"). If the endpoint contains a +// prefix of "https" the server will generate weak self-signed TLS certificates +// and use them to server data. If the endpoint contains a path, that path will +// be used instead of the default OTLP metri endpoint path. +// +// If errCh is not nil, the collector will respond to HTTP requests with errors +// sent on that channel. This means that if errCh is not nil Export calls will +// block until an error is received. +func newHTTPCollector(endpoint string, resultCh <-chan exportResult, opts ...func(*httpCollector)) (*httpCollector, error) { + u, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + if u.Host == "" { + u.Host = "localhost:0" + } + if u.Path == "" { + u.Path = defaultPath + } + + c := &httpCollector{ + headers: http.Header{}, + storage: newStorage(), + resultCh: resultCh, + } + for _, opt := range opts { + opt(c) + } + + c.listener, err = net.Listen("tcp", u.Host) + if err != nil { + return nil, err + } + + mux := http.NewServeMux() + mux.Handle(u.Path, http.HandlerFunc(c.handler)) + c.srv = &http.Server{ + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + if u.Scheme == "https" { + cert, err := newWeakCertificate() + if err != nil { + return nil, err + } + c.srv.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + go func() { _ = c.srv.ServeTLS(c.listener, "", "") }() + } else { + go func() { _ = c.srv.Serve(c.listener) }() + } + return c, nil +} + +// withHTTPCollectorRespondingPlainText makes the HTTPCollector return +// a plaintext, instead of protobuf, response. +func withHTTPCollectorRespondingPlainText() func(*httpCollector) { + return func(s *httpCollector) { + s.plainTextResponse = true + } +} + +// Shutdown shuts down the HTTP server closing all open connections and +// listeners. +func (c *httpCollector) Shutdown(ctx context.Context) error { + return c.srv.Shutdown(ctx) +} + +// Addr returns the net.Addr c is listening at. +func (c *httpCollector) Addr() net.Addr { + return c.listener.Addr() +} + +// Collect returns the Storage holding all collected requests. +func (c *httpCollector) Collect() *storage { + return c.storage +} + +// Headers returns the headers received for all requests. +func (c *httpCollector) Headers() map[string][]string { + // Makes a copy. + c.headersMu.Lock() + defer c.headersMu.Unlock() + return c.headers.Clone() +} + +func (c *httpCollector) handler(w http.ResponseWriter, r *http.Request) { + c.respond(w, c.record(r)) +} + +func (c *httpCollector) record(r *http.Request) exportResult { + // Currently only supports protobuf. + if v := r.Header.Get("Content-Type"); v != "application/x-protobuf" { + err := fmt.Errorf("content-type not supported: %s", v) + return exportResult{Err: err} + } + + body, err := c.readBody(r) + if err != nil { + return exportResult{Err: err} + } + pbRequest := &collogpb.ExportLogsServiceRequest{} + err = proto.Unmarshal(body, pbRequest) + if err != nil { + return exportResult{ + Err: &httpResponseError{ + Err: err, + Status: http.StatusInternalServerError, + }, + } + } + c.storage.Add(pbRequest) + + c.headersMu.Lock() + for k, vals := range r.Header { + for _, v := range vals { + c.headers.Add(k, v) + } + } + c.headersMu.Unlock() + + if c.resultCh != nil { + return <-c.resultCh + } + return exportResult{Err: err} +} + +func (c *httpCollector) readBody(r *http.Request) (body []byte, err error) { + var reader io.ReadCloser + switch r.Header.Get("Content-Encoding") { + case "gzip": + reader, err = gzip.NewReader(r.Body) + if err != nil { + _ = reader.Close() + return nil, &httpResponseError{ + Err: err, + Status: http.StatusInternalServerError, + } + } + default: + reader = r.Body + } + + defer func() { + cErr := reader.Close() + if err == nil && cErr != nil { + err = &httpResponseError{ + Err: cErr, + Status: http.StatusInternalServerError, + } + } + }() + body, err = io.ReadAll(reader) + if err != nil { + err = &httpResponseError{ + Err: err, + Status: http.StatusInternalServerError, + } + } + return body, err +} + +func (c *httpCollector) respond(w http.ResponseWriter, resp exportResult) { + if resp.Err != nil { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + var e *httpResponseError + if errors.As(resp.Err, &e) { + for k, vals := range e.Header { + for _, v := range vals { + w.Header().Add(k, v) + } + } + w.WriteHeader(e.Status) + fmt.Fprintln(w, e.Error()) + } else { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintln(w, resp.Err.Error()) + } + return + } + + if c.plainTextResponse { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.WriteHeader(http.StatusOK) + if resp.Response == nil { + _, _ = w.Write(emptyExportLogsServiceResponse) + } else { + r, err := proto.Marshal(resp.Response) + if err != nil { + panic(err) + } + _, _ = w.Write(r) + } +} + +// Based on https://golang.org/src/crypto/tls/generate_cert.go, +// simplified and weakened. +func newWeakCertificate() (tls.Certificate, error) { + priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return tls.Certificate{}, err + } + notBefore := time.Now() + notAfter := notBefore.Add(time.Hour) + max := new(big.Int).Lsh(big.NewInt(1), 128) + sn, err := rand.Int(rand.Reader, max) + if err != nil { + return tls.Certificate{}, err + } + tmpl := x509.Certificate{ + SerialNumber: sn, + Subject: pkix.Name{Organization: []string{"otel-go"}}, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + DNSNames: []string{"localhost"}, + IPAddresses: []net.IP{net.IPv6loopback, net.IPv4(127, 0, 0, 1)}, + } + derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &priv.PublicKey, priv) + if err != nil { + return tls.Certificate{}, err + } + var certBuf bytes.Buffer + err = pem.Encode(&certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + if err != nil { + return tls.Certificate{}, err + } + privBytes, err := x509.MarshalPKCS8PrivateKey(priv) + if err != nil { + return tls.Certificate{}, err + } + var privBuf bytes.Buffer + err = pem.Encode(&privBuf, &pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) + if err != nil { + return tls.Certificate{}, err + } + return tls.X509KeyPair(certBuf.Bytes(), privBuf.Bytes()) +} + +func TestClient(t *testing.T) { + factory := func(rCh <-chan exportResult) (*client, *httpCollector) { + coll, err := newHTTPCollector("", rCh) + require.NoError(t, err) + + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := newConfig(opts) + client, err := newHTTPClient(cfg) + require.NoError(t, err) + return client, coll + } + + t.Run("ClientHonorsContextErrors", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + t.Run("DeadlineExceeded", func(t *testing.T) { + innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond) + t.Cleanup(innerCancel) + <-innerCtx.Done() + + c, _ := factory(nil) + assert.ErrorIs(t, c.uploadLogs(innerCtx, nil), context.DeadlineExceeded) + }) + + t.Run("Canceled", func(t *testing.T) { + innerCtx, innerCancel := context.WithCancel(ctx) + innerCancel() + + c, _ := factory(nil) + assert.ErrorIs(t, c.uploadLogs(innerCtx, nil), context.Canceled) + }) + }) + + t.Run("uploadLogs", func(t *testing.T) { + ctx := context.Background() + client, coll := factory(nil) + + require.NoError(t, client.uploadLogs(ctx, resourceLogs)) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") + diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal)) + if diff != "" { + t.Fatalf("unexpected ResourceLogs:\n%s", diff) + } + }) + + t.Run("PartialSuccess", func(t *testing.T) { + const n, msg = 2, "bad data" + rCh := make(chan exportResult, 3) + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + RejectedLogRecords: n, + ErrorMessage: msg, + }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + // Should not be logged. + RejectedLogRecords: 0, + ErrorMessage: "", + }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{}, + } + + ctx := context.Background() + client, _ := factory(rCh) + + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) + + errs := []error{} + eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) }) + otel.SetErrorHandler(eh) + + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + + require.Equal(t, 1, len(errs)) + want := fmt.Sprintf("%s (%d log records rejected)", msg, n) + assert.ErrorContains(t, errs[0], want) + }) +} + +func TestClientWithHTTPCollectorRespondingPlainText(t *testing.T) { + ctx := context.Background() + coll, err := newHTTPCollector("", nil, withHTTPCollectorRespondingPlainText()) + require.NoError(t, err) + + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := newConfig(opts) + client, err := newHTTPClient(cfg) + require.NoError(t, err) + + require.NoError(t, client.uploadLogs(ctx, make([]*lpb.ResourceLogs, 1))) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") +} + +func TestNewWithInvalidEndpoint(t *testing.T) { + ctx := context.Background() + exp, err := New(ctx, WithEndpoint("host:invalid-port")) + assert.Error(t, err) + assert.Nil(t, exp) +} + +func TestConfig(t *testing.T) { + factoryFunc := func(ePt string, rCh <-chan exportResult, o ...Option) (log.Exporter, *httpCollector) { + coll, err := newHTTPCollector(ePt, rCh) + require.NoError(t, err) + + opts := []Option{WithEndpoint(coll.Addr().String())} + if !strings.HasPrefix(strings.ToLower(ePt), "https") { + opts = append(opts, WithInsecure()) + } + opts = append(opts, o...) + + ctx := context.Background() + exp, err := New(ctx, opts...) + require.NoError(t, err) + return exp, coll + } + + t.Run("WithEndpointURL", func(t *testing.T) { + coll, err := newHTTPCollector("", nil) + require.NoError(t, err) + ctx := context.Background() + + target := "http://" + coll.Addr().String() + defaultPath + exp, err := New(ctx, WithEndpointURL(target)) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + assert.Len(t, coll.Collect().Dump(), 1) + }) + + t.Run("WithHeaders", func(t *testing.T) { + key := http.CanonicalHeaderKey("my-custom-header") + headers := map[string]string{key: "custom-value"} + exp, coll := factoryFunc("", nil, WithHeaders(headers)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Regexp(t, "OTel Go OTLP over HTTP/protobuf logs exporter/[01]\\..*", got) + require.Contains(t, got, key) + assert.Equal(t, got[key], []string{headers[key]}) + }) + + t.Run("WithTimeout", func(t *testing.T) { + // Do not send on rCh so the Collector never responds to the client. + rCh := make(chan exportResult) + exp, coll := factoryFunc( + "", + rCh, + WithTimeout(time.Millisecond), + WithRetry(RetryConfig{Enabled: false}), + ) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + // Push this after Shutdown so the HTTP server doesn't hang. + t.Cleanup(func() { close(rCh) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + err := exp.Export(ctx, make([]log.Record, 1)) + assert.ErrorAs(t, err, new(retryableError)) + }) + + t.Run("WithCompressionGZip", func(t *testing.T) { + exp, coll := factoryFunc("", nil, WithCompression(GzipCompression)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + assert.Len(t, coll.Collect().Dump(), 1) + }) + + t.Run("WithRetry", func(t *testing.T) { + emptyErr := errors.New("") + rCh := make(chan exportResult, 5) + header := http.Header{http.CanonicalHeaderKey("Retry-After"): {"10"}} + // All retryable errors. + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusServiceUnavailable, + Err: emptyErr, + Header: header, + }} + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusTooManyRequests, + Err: emptyErr, + }} + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusGatewayTimeout, + Err: emptyErr, + }} + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusBadGateway, + Err: emptyErr, + }} + rCh <- exportResult{} + exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{ + Enabled: true, + InitialInterval: time.Nanosecond, + MaxInterval: time.Millisecond, + MaxElapsedTime: time.Minute, + })) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + // Push this after Shutdown so the HTTP server doesn't hang. + t.Cleanup(func() { close(rCh) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1)), "failed retry") + assert.Len(t, rCh, 0, "failed HTTP responses did not occur") + }) + + t.Run("WithURLPath", func(t *testing.T) { + path := "/prefix/v2/logs" + ePt := fmt.Sprintf("http://localhost:0%s", path) + exp, coll := factoryFunc(ePt, nil, WithURLPath(path)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + assert.Len(t, coll.Collect().Dump(), 1) + }) + + t.Run("WithTLSClientConfig", func(t *testing.T) { + ePt := "https://localhost:0" + tlsCfg := &tls.Config{InsecureSkipVerify: true} + exp, coll := factoryFunc(ePt, nil, WithTLSClientConfig(tlsCfg)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + assert.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + assert.Len(t, coll.Collect().Dump(), 1) + }) + + t.Run("WithCustomUserAgent", func(t *testing.T) { + key := http.CanonicalHeaderKey("user-agent") + headers := map[string]string{key: "custom-user-agent"} + exp, coll := factoryFunc("", nil, WithHeaders(headers)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Contains(t, got, key) + assert.Equal(t, got[key], []string{headers[key]}) + }) + + t.Run("WithProxy", func(t *testing.T) { + headerKeySetInProxy := http.CanonicalHeaderKey("X-Using-Proxy") + headerValueSetInProxy := "true" + exp, coll := factoryFunc("", nil, WithProxy(func(r *http.Request) (*url.URL, error) { + r.Header.Set(headerKeySetInProxy, headerValueSetInProxy) + return r.URL, nil + })) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Contains(t, got, headerKeySetInProxy) + assert.Equal(t, got[headerKeySetInProxy], []string{headerValueSetInProxy}) + }) +} diff --git a/exporters/otlp/otlplog/otlploghttp/config.go b/exporters/otlp/otlplog/otlploghttp/config.go index 6a414678c5b..3564e9caffd 100644 --- a/exporters/otlp/otlplog/otlploghttp/config.go +++ b/exporters/otlp/otlplog/otlploghttp/config.go @@ -5,10 +5,17 @@ package otlploghttp // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o import ( "crypto/tls" + "crypto/x509" + "errors" + "fmt" "net/http" "net/url" + "os" + "strconv" + "strings" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry" "go.opentelemetry.io/otel/internal/global" ) @@ -19,7 +26,53 @@ var ( defaultPath = "/v1/logs" defaultTimeout = 10 * time.Second defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment - defaultRetryCfg = RetryConfig(retry.DefaultConfig) + defaultRetryCfg = retry.DefaultConfig +) + +// Environment variable keys. +var ( + envEndpoint = []string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + "OTEL_EXPORTER_OTLP_ENDPOINT", + } + envInsecure = envEndpoint + + // Split because these are parsed differently. + envPathSignal = []string{"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"} + envPathOTLP = []string{"OTEL_EXPORTER_OTLP_ENDPOINT"} + + envHeaders = []string{ + "OTEL_EXPORTER_OTLP_LOGS_HEADERS", + "OTEL_EXPORTER_OTLP_HEADERS", + } + + envCompression = []string{ + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION", + "OTEL_EXPORTER_OTLP_COMPRESSION", + } + + envTimeout = []string{ + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT", + "OTEL_EXPORTER_OTLP_TIMEOUT", + } + + envTLSCert = []string{ + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CERTIFICATE", + } + envTLSClient = []struct { + Certificate string + Key string + }{ + { + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY", + }, + { + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CLIENT_KEY", + }, + } ) // Option applies an option to the Exporter. @@ -40,7 +93,7 @@ type config struct { compression setting[Compression] timeout setting[time.Duration] proxy setting[HTTPTransportProxyFunc] - retryCfg setting[RetryConfig] + retryCfg setting[retry.Config] } func newConfig(options []Option) config { @@ -50,19 +103,35 @@ func newConfig(options []Option) config { } c.endpoint = c.endpoint.Resolve( + getenv[string](envEndpoint, convEndpoint), fallback[string](defaultEndpoint), ) c.path = c.path.Resolve( + getenv[string](envPathSignal, convPathExact), + getenv[string](envPathOTLP, convPath), fallback[string](defaultPath), ) + c.insecure = c.insecure.Resolve( + getenv[bool](envInsecure, convInsecure), + ) + c.tlsCfg = c.tlsCfg.Resolve( + loadEnvTLS[*tls.Config](), + ) + c.headers = c.headers.Resolve( + getenv[map[string]string](envHeaders, convHeaders), + ) + c.compression = c.compression.Resolve( + getenv[Compression](envCompression, convCompression), + ) c.timeout = c.timeout.Resolve( + getenv[time.Duration](envTimeout, convDuration), fallback[time.Duration](defaultTimeout), ) c.proxy = c.proxy.Resolve( fallback[HTTPTransportProxyFunc](defaultProxy), ) c.retryCfg = c.retryCfg.Resolve( - fallback[RetryConfig](defaultRetryCfg), + fallback[retry.Config](defaultRetryCfg), ) return c @@ -253,7 +322,7 @@ type RetryConfig retry.Config // after each error for no more than a total time of 1 minute. func WithRetry(rc RetryConfig) Option { return fnOpt(func(c config) config { - c.retryCfg = newSetting(rc) + c.retryCfg = newSetting(retry.Config(rc)) return c }) } @@ -303,6 +372,219 @@ func (s setting[T]) Resolve(fn ...resolver[T]) setting[T] { return s } +// loadEnvTLS returns a resolver that loads a *tls.Config from files defeind by +// the OTLP TLS environment variables. This will load both the rootCAs and +// certificates used for mTLS. +// +// If the filepath defined is invalid or does not contain valid TLS files, an +// error is passed to the OTel ErrorHandler and no TLS configuration is +// provided. +func loadEnvTLS[T *tls.Config]() resolver[T] { + return func(s setting[T]) setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + var rootCAs *x509.CertPool + var err error + for _, key := range envTLSCert { + if v := os.Getenv(key); v != "" { + rootCAs, err = loadCertPool(v) + break + } + } + + var certs []tls.Certificate + for _, pair := range envTLSClient { + cert := os.Getenv(pair.Certificate) + key := os.Getenv(pair.Key) + if cert != "" && key != "" { + var e error + certs, e = loadCertificates(cert, key) + err = errors.Join(err, e) + break + } + } + + if err != nil { + err = fmt.Errorf("failed to load TLS: %w", err) + otel.Handle(err) + } else if rootCAs != nil || certs != nil { + s.Set = true + s.Value = &tls.Config{RootCAs: rootCAs, Certificates: certs} + } + return s + } +} + +// readFile is used for testing. +var readFile = os.ReadFile + +// loadCertPool loads and returns the *x509.CertPool found at path if it exists +// and is valid. Otherwise, nil and an error is returned. +func loadCertPool(path string) (*x509.CertPool, error) { + b, err := readFile(path) + if err != nil { + return nil, err + } + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(b); !ok { + return nil, errors.New("certificate not added") + } + return cp, nil +} + +// loadCertPool loads and returns the tls.Certificate found at path if it +// exists and is valid. Otherwise, nil and an error is returned. +func loadCertificates(certPath, keyPath string) ([]tls.Certificate, error) { + cert, err := readFile(certPath) + if err != nil { + return nil, err + } + key, err := readFile(keyPath) + if err != nil { + return nil, err + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + return []tls.Certificate{crt}, nil +} + +// getenv returns a resolver that will apply an environment variable value +// associated with the first set key to a setting value. The conv function is +// used to convert between the environment variable value and the setting type. +// +// If the input setting to the resolver is set, the environment variable will +// not be applied. +// +// Any error returned from conv is sent to the OTel ErrorHandler and the +// setting will not be updated. +func getenv[T any](keys []string, conv func(string) (T, error)) resolver[T] { + return func(s setting[T]) setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + for _, key := range keys { + if vStr := os.Getenv(key); vStr != "" { + v, err := conv(vStr) + if err == nil { + s.Value = v + s.Set = true + break + } + otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err)) + } + } + return s + } +} + +// convEndpoint converts s from a URL string to an endpoint if s is a valid +// URL. Otherwise, "" and an error are returned. +func convEndpoint(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + return u.Host, nil +} + +// convPathExact converts s from a URL string to the exact path if s is a valid +// URL. Otherwise, "" and an error are returned. +// +// If the path contained in s is empty, "/" is returned. +func convPathExact(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + if u.Path == "" { + return "/", nil + } + return u.Path, nil +} + +// convPath converts s from a URL string to an OTLP endpoint path if s is a +// valid URL. Otherwise, "" and an error are returned. +func convPath(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + return u.Path + "/v1/logs", nil +} + +// convInsecure parses s as a URL string and returns if the connection should +// use client transport security or not. If s is an invalid URL, false and an +// error are returned. +func convInsecure(s string) (bool, error) { + u, err := url.Parse(s) + if err != nil { + return false, err + } + return u.Scheme != "https", nil +} + +// convHeaders converts the OTel environment variable header value s into a +// mapping of header key to value. If s is invalid a partial result and error +// are returned. +func convHeaders(s string) (map[string]string, error) { + out := make(map[string]string) + var err error + for _, header := range strings.Split(s, ",") { + rawKey, rawVal, found := strings.Cut(header, "=") + if !found { + err = errors.Join(err, fmt.Errorf("invalid header: %s", header)) + continue + } + + escKey, e := url.PathUnescape(rawKey) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header key: %s", rawKey)) + continue + } + key := strings.TrimSpace(escKey) + + escVal, e := url.PathUnescape(rawVal) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header value: %s", rawVal)) + continue + } + val := strings.TrimSpace(escVal) + + out[key] = val + } + return out, err +} + +// convCompression returns the parsed compression encoded in s. NoCompression +// and an errors are returned if s is unknown. +func convCompression(s string) (Compression, error) { + switch s { + case "gzip": + return GzipCompression, nil + case "none", "": + return NoCompression, nil + } + return NoCompression, fmt.Errorf("unknown compression: %s", s) +} + +// convDuration converts s into a duration of milliseconds. If s does not +// contain an integer, 0 and an error are returned. +func convDuration(s string) (time.Duration, error) { + d, err := strconv.Atoi(s) + if err != nil { + return 0, err + } + // OTel durations are defined in milliseconds. + return time.Duration(d) * time.Millisecond, nil +} + // fallback returns a resolve that will set a setting value to val if it is not // already set. // diff --git a/exporters/otlp/otlplog/otlploghttp/config_test.go b/exporters/otlp/otlplog/otlploghttp/config_test.go index e28422e7469..bf95c43002e 100644 --- a/exporters/otlp/otlplog/otlploghttp/config_test.go +++ b/exporters/otlp/otlplog/otlploghttp/config_test.go @@ -5,24 +5,89 @@ package otlploghttp import ( "crypto/tls" + "crypto/x509" + "errors" + "fmt" "net/http" "net/url" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry" +) + +const ( + weakCertificate = ` +-----BEGIN CERTIFICATE----- +MIIBhzCCASygAwIBAgIRANHpHgAWeTnLZpTSxCKs0ggwCgYIKoZIzj0EAwIwEjEQ +MA4GA1UEChMHb3RlbC1nbzAeFw0yMTA0MDExMzU5MDNaFw0yMTA0MDExNDU5MDNa +MBIxEDAOBgNVBAoTB290ZWwtZ28wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS9 +nWSkmPCxShxnp43F+PrOtbGV7sNfkbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0Z +sJCLHGogQsYnWJBXUZOVo2MwYTAOBgNVHQ8BAf8EBAMCB4AwEwYDVR0lBAwwCgYI +KwYBBQUHAwEwDAYDVR0TAQH/BAIwADAsBgNVHREEJTAjgglsb2NhbGhvc3SHEAAA +AAAAAAAAAAAAAAAAAAGHBH8AAAEwCgYIKoZIzj0EAwIDSQAwRgIhANwZVVKvfvQ/ +1HXsTvgH+xTQswOwSSKYJ1cVHQhqK7ZbAiEAus8NxpTRnp5DiTMuyVmhVNPB+bVH +Lhnm4N/QDk5rek0= +-----END CERTIFICATE----- +` + weakPrivateKey = ` +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgN8HEXiXhvByrJ1zK +SFT6Y2l2KqDWwWzKf+t4CyWrNKehRANCAAS9nWSkmPCxShxnp43F+PrOtbGV7sNf +kbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0ZsJCLHGogQsYnWJBXUZOV +-----END PRIVATE KEY----- +` ) +func newTLSConf(cert, key []byte) (*tls.Config, error) { + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(cert); !ok { + return nil, errors.New("failed to append certificate to the cert pool") + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + crts := []tls.Certificate{crt} + return &tls.Config{RootCAs: cp, Certificates: crts}, nil +} + func TestNewConfig(t *testing.T) { - tlsCfg := &tls.Config{} + orig := readFile + readFile = func() func(name string) ([]byte, error) { + index := map[string][]byte{ + "cert_path": []byte(weakCertificate), + "key_path": []byte(weakPrivateKey), + "invalid_cert": []byte("invalid certificate file."), + "invalid_key": []byte("invalid key file."), + } + return func(name string) ([]byte, error) { + b, ok := index[name] + if !ok { + err := fmt.Errorf("file does not exist: %s", name) + return nil, err + } + return b, nil + } + }() + t.Cleanup(func() { readFile = orig }) + + tlsCfg, err := newTLSConf([]byte(weakCertificate), []byte(weakPrivateKey)) + require.NoError(t, err, "testing TLS config") + headers := map[string]string{"a": "A"} - rc := RetryConfig{} + rc := retry.Config{} testcases := []struct { name string options []Option envars map[string]string want config + errs []string }{ { name: "Defaults", @@ -43,7 +108,7 @@ func TestNewConfig(t *testing.T) { WithCompression(GzipCompression), WithHeaders(headers), WithTimeout(time.Second), - WithRetry(rc), + WithRetry(RetryConfig(rc)), // Do not test WithProxy. Requires func comparison. }, want: config{ @@ -102,6 +167,176 @@ func TestNewConfig(t *testing.T) { retryCfg: newSetting(defaultRetryCfg), }, }, + { + name: "LogEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + path: newSetting("/prefix"), + insecure: newSetting(false), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "LogEnpointEnvironmentVariablesDefaultPath", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint", + }, + want: config{ + endpoint: newSetting("env.endpoint"), + path: newSetting("/"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OTLPEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + path: newSetting("/prefix/v1/logs"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(NoCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OTLPEnpointEnvironmentVariablesDefaultPath", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint", + }, + want: config{ + endpoint: newSetting("env.endpoint"), + path: newSetting(defaultPath), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "EnvironmentVariablesPrecedence", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt", + "OTEL_EXPORTER_OTLP_HEADERS": "b=B", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "30000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key", + + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/path", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + path: newSetting("/path"), + insecure: newSetting(false), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OptionsPrecedence", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt", + "OTEL_EXPORTER_OTLP_HEADERS": "b=B", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "30000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key", + + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + options: []Option{ + WithEndpoint("test"), + WithURLPath("/path"), + WithInsecure(), + WithTLSClientConfig(tlsCfg), + WithCompression(GzipCompression), + WithHeaders(headers), + WithTimeout(time.Second), + WithRetry(RetryConfig(rc)), + }, + want: config{ + endpoint: newSetting("test"), + path: newSetting("/path"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(time.Second), + retryCfg: newSetting(rc), + }, + }, + { + name: "InvalidEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "%invalid", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a,%ZZ=valid,key=%ZZ", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "xz", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "100 seconds", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key", + }, + want: config{ + endpoint: newSetting(defaultEndpoint), + path: newSetting(defaultPath), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + errs: []string{ + `invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`, + `failed to load TLS:`, + `certificate not added`, + `tls: failed to find any PEM data in certificate input`, + `invalid OTEL_EXPORTER_OTLP_LOGS_HEADERS value a,%ZZ=valid,key=%ZZ:`, + `invalid header: a`, + `invalid header key: %ZZ`, + `invalid header value: %ZZ`, + `invalid OTEL_EXPORTER_OTLP_LOGS_COMPRESSION value xz: unknown compression: xz`, + `invalid OTEL_EXPORTER_OTLP_LOGS_TIMEOUT value 100 seconds: strconv.Atoi: parsing "100 seconds": invalid syntax`, + }, + }, } for _, tc := range testcases { @@ -109,14 +344,57 @@ func TestNewConfig(t *testing.T) { for key, value := range tc.envars { t.Setenv(key, value) } + + var err error + t.Cleanup(func(orig otel.ErrorHandler) func() { + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(e error) { + err = errors.Join(err, e) + })) + return func() { otel.SetErrorHandler(orig) } + }(otel.GetErrorHandler())) c := newConfig(tc.options) - // Cannot compare funcs + + // Do not compare pointer values. + assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg) + var emptyTLS setting[*tls.Config] + c.tlsCfg, tc.want.tlsCfg = emptyTLS, emptyTLS + + // Cannot compare funcs, see TestWithProxy. c.proxy = setting[HTTPTransportProxyFunc]{} + assert.Equal(t, tc.want, c) + + for _, errMsg := range tc.errs { + assert.ErrorContains(t, err, errMsg) + } }) } } +func assertTLSConfig(t *testing.T, want, got setting[*tls.Config]) { + t.Helper() + + assert.Equal(t, want.Set, got.Set, "setting Set") + if !want.Set { + return + } + + if want.Value == nil { + assert.Nil(t, got.Value, "*tls.Config") + return + } + require.NotNil(t, got.Value, "*tls.Config") + + if want.Value.RootCAs == nil { + assert.Nil(t, got.Value.RootCAs, "*tls.Config.RootCAs") + } else { + if assert.NotNil(t, got.Value.RootCAs, "RootCAs") { + assert.True(t, want.Value.RootCAs.Equal(got.Value.RootCAs), "RootCAs equal") + } + } + assert.Equal(t, want.Value.Certificates, got.Value.Certificates, "Certificates") +} + func TestWithProxy(t *testing.T) { proxy := func(*http.Request) (*url.URL, error) { return nil, nil } opts := []Option{WithProxy(HTTPTransportProxyFunc(proxy))} diff --git a/exporters/otlp/otlplog/otlploghttp/go.mod b/exporters/otlp/otlplog/otlploghttp/go.mod index bb7321e0b13..c629bc4406e 100644 --- a/exporters/otlp/otlplog/otlploghttp/go.mod +++ b/exporters/otlp/otlplog/otlploghttp/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/cenkalti/backoff/v4 v4.3.0 + github.com/google/go-cmp v0.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.25.0 go.opentelemetry.io/otel/log v0.0.1-alpha @@ -11,6 +12,7 @@ require ( go.opentelemetry.io/otel/sdk/log v0.0.0-20240403115316-6c6e1e7416e9 go.opentelemetry.io/otel/trace v1.25.0 go.opentelemetry.io/proto/slim/otlp v1.2.0 + google.golang.org/protobuf v1.33.0 ) require ( @@ -20,7 +22,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel/metric v1.25.0 // indirect golang.org/x/sys v0.19.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sdk/log/DESIGN.md b/sdk/log/DESIGN.md index 41f1cdadf0f..b1a69aae9ba 100644 --- a/sdk/log/DESIGN.md +++ b/sdk/log/DESIGN.md @@ -55,10 +55,10 @@ The user can configure custom processors and decorate built-in processors. The [Simple processor](https://opentelemetry.io/docs/specs/otel/logs/sdk/#simple-processor) is implemented as `SimpleProcessor` struct in [simple.go](simple.go). -### BatchingProcessor +### BatchProcessor The [Batching processor](https://opentelemetry.io/docs/specs/otel/logs/sdk/#batching-processor) -is implemented as `BatchingProcessor` struct in [batch.go](batch.go). +is implemented as `BatchProcessor` struct in [batch.go](batch.go). The `Batcher` can be also configured using the `OTEL_BLRP_*` environment variables as [defined by the specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#batch-logrecord-processor). diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 1089f75e69e..5cb9f5956aa 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -6,7 +6,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "container/ring" "context" + "errors" + "slices" "sync" + "sync/atomic" "time" ) @@ -22,60 +25,224 @@ const ( envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" ) -// Compile-time check BatchingProcessor implements Processor. -var _ Processor = (*BatchingProcessor)(nil) - -// BatchingProcessor is a processor that exports batches of log records. -type BatchingProcessor struct { - exporter Exporter - - maxQueueSize int - exportInterval time.Duration - exportTimeout time.Duration - exportMaxBatchSize int +// Compile-time check BatchProcessor implements Processor. +var _ Processor = (*BatchProcessor)(nil) + +// BatchProcessor is a processor that exports batches of log records. +// A BatchProcessor must be created with [NewBatchProcessor]. +type BatchProcessor struct { + // The BatchProcessor is designed to provide the highest throughput of + // log records possible while being compatible with OpenTelemetry. The + // entry point of log records is the OnEmit method. This method is designed + // to receive records as fast as possible while still honoring shutdown + // commands. All records received are enqueued to queue. + // + // In order to block OnEmit as little as possible, a separate "poll" + // goroutine is spawned at the creation of a BatchProcessor. This + // goroutine is responsible for batching the queue at regular polled + // intervals, or when it is directly signaled to. + // + // To keep the polling goroutine from backing up, all batches it makes are + // exported with a bufferedExporter. This exporter allows the poll + // goroutine to enqueue an export payload that will be handled in a + // separate goroutine dedicated to the export. This asynchronous behavior + // allows the poll goroutine to maintain accurate interval polling. + // + // ___BatchProcessor____ __Poll Goroutine__ __Export Goroutine__ + // || || || || || || + // || ********** || || || || ********** || + // || Records=>* OnEmit * || || | - ticker || || * export * || + // || ********** || || | - trigger || || ********** || + // || || || || | || || || || + // || || || || | || || || || + // || __________\/___ || || |*********** || || ______/\_______ || + // || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] || + // || || || |*********** || || || + // ||_____________________|| ||__________________|| ||____________________|| + // + // + // The "release valve" in this processing is the record queue. This queue + // is a ring buffer. It will overwrite the oldest records first when writes + // to OnEmit are made faster than the queue can be flushed. If batches + // cannot be flushed to the export buffer, the records will remain in the + // queue. + + // exporter is the bufferedExporter all batches are exported with. + exporter *bufferExporter + + // q is the active queue of records that have not yet been exported. + q *queue + // batchSize is the minimum number of records needed before an export is + // triggered (unless the interval expires). + batchSize int + + // pollTrigger triggers the poll goroutine to flush a batch from the queue. + // This is sent to when it is known that the queue contains at least one + // complete batch. + // + // When a send is made to the channel, the poll loop will be reset after + // the flush. If there is still enough records in the queue for another + // batch the reset of the poll loop will automatically re-trigger itself. + // There is no need for the original sender to monitor and resend. + pollTrigger chan struct{} + // pollKill kills the poll goroutine. This is only expected to be closed + // once by the Shutdown method. + pollKill chan struct{} + // pollDone signals the poll goroutine has completed. + pollDone chan struct{} + + // stopped holds the stopped state of the BatchProcessor. + stopped atomic.Bool } -// NewBatchingProcessor decorates the provided exporter +// NewBatchProcessor decorates the provided exporter // so that the log records are batched before exporting. // // All of the exporter's methods are called synchronously. -func NewBatchingProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchingProcessor { +func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchProcessor { + cfg := newBatchConfig(opts) if exporter == nil { // Do not panic on nil export. exporter = defaultNoopExporter } - cfg := newBatchingConfig(opts) - return &BatchingProcessor{ - exporter: exporter, - - maxQueueSize: cfg.maxQSize.Value, - exportInterval: cfg.expInterval.Value, - exportTimeout: cfg.expTimeout.Value, - exportMaxBatchSize: cfg.expMaxBatchSize.Value, + // Order is important here. Wrap the timeoutExporter with the chunkExporter + // to ensure each export completes in timeout (instead of all chuncked + // exports). + exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value) + // Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched + // appropriately on export. + exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) + + b := &BatchProcessor{ + // TODO: explore making the size of this configurable. + exporter: newBufferExporter(exporter, 1), + + q: newQueue(cfg.maxQSize.Value), + batchSize: cfg.expMaxBatchSize.Value, + pollTrigger: make(chan struct{}, 1), + pollKill: make(chan struct{}), } + b.pollDone = b.poll(cfg.expInterval.Value) + return b +} + +// poll spawns a goroutine to handle interval polling and batch exporting. The +// returned done chan is closed when the spawned goroutine completes. +func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { + done = make(chan struct{}) + + ticker := time.NewTicker(interval) + // TODO: investigate using a sync.Pool instead of cloning. + buf := make([]Record, b.batchSize) + go func() { + defer close(done) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-b.pollTrigger: + ticker.Reset(interval) + case <-b.pollKill: + return + } + + qLen := b.q.TryDequeue(buf, func(r []Record) bool { + ok := b.exporter.EnqueueExport(r) + if ok { + buf = slices.Clone(buf) + } + return ok + }) + if qLen >= b.batchSize { + // There is another full batch ready. Immediately trigger + // another export attempt. + select { + case b.pollTrigger <- struct{}{}: + default: + // Another flush signal already received. + } + } + } + }() + return done } // OnEmit batches provided log record. -func (b *BatchingProcessor) OnEmit(ctx context.Context, r Record) error { - // TODO (#5063): Implement. +func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { + if b.stopped.Load() { + return nil + } + if n := b.q.Enqueue(r); n >= b.batchSize { + select { + case b.pollTrigger <- struct{}{}: + default: + // Flush chan full. The poll goroutine will handle this by + // re-sending any trigger until the queue has less than batchSize + // records. + } + } return nil } -// Enabled returns true. -func (b *BatchingProcessor) Enabled(context.Context, Record) bool { - return true +// Enabled returns if b is enabled. +func (b *BatchProcessor) Enabled(context.Context, Record) bool { + return !b.stopped.Load() } // Shutdown flushes queued log records and shuts down the decorated exporter. -func (b *BatchingProcessor) Shutdown(ctx context.Context) error { - // TODO (#5063): Implement. - return nil +func (b *BatchProcessor) Shutdown(ctx context.Context) error { + if b.stopped.Swap(true) { + return nil + } + + // Stop the poll goroutine. + close(b.pollKill) + select { + case <-b.pollDone: + case <-ctx.Done(): + // Out of time. + return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) + } + + // Flush remaining queued before exporter shutdown. + err := b.exporter.Export(ctx, b.q.Flush()) + return errors.Join(err, b.exporter.Shutdown(ctx)) +} + +var errPartialFlush = errors.New("partial flush: export buffer full") + +// Used for testing. +var ctxErr = func(ctx context.Context) error { + return ctx.Err() } // ForceFlush flushes queued log records and flushes the decorated exporter. -func (b *BatchingProcessor) ForceFlush(ctx context.Context) error { - // TODO (#5063): Implement. - return nil +func (b *BatchProcessor) ForceFlush(ctx context.Context) error { + if b.stopped.Load() { + return nil + } + + buf := make([]Record, b.q.cap) + notFlushed := func() bool { + var flushed bool + _ = b.q.TryDequeue(buf, func(r []Record) bool { + flushed = b.exporter.EnqueueExport(r) + return flushed + }) + return !flushed + } + var err error + // For as long as ctx allows, try to make a single flush of the queue. + for notFlushed() { + // Use ctxErr instead of calling ctx.Err directly so we can test + // the partial error return. + if e := ctxErr(ctx); e != nil { + err = errors.Join(e, errPartialFlush) + break + } + } + return errors.Join(err, b.exporter.ForceFlush(ctx)) } // queue holds a queue of logging records. @@ -163,15 +330,15 @@ func (q *queue) Flush() []Record { return out } -type batchingConfig struct { +type batchConfig struct { maxQSize setting[int] expInterval setting[time.Duration] expTimeout setting[time.Duration] expMaxBatchSize setting[int] } -func newBatchingConfig(options []BatchProcessorOption) batchingConfig { - var c batchingConfig +func newBatchConfig(options []BatchProcessorOption) batchConfig { + var c batchConfig for _, o := range options { c = o.apply(c) } @@ -205,14 +372,14 @@ func newBatchingConfig(options []BatchProcessorOption) batchingConfig { return c } -// BatchProcessorOption applies a configuration to a [BatchingProcessor]. +// BatchProcessorOption applies a configuration to a [BatchProcessor]. type BatchProcessorOption interface { - apply(batchingConfig) batchingConfig + apply(batchConfig) batchConfig } -type batchingOptionFunc func(batchingConfig) batchingConfig +type batchOptionFunc func(batchConfig) batchConfig -func (fn batchingOptionFunc) apply(c batchingConfig) batchingConfig { +func (fn batchOptionFunc) apply(c batchConfig) batchConfig { return fn(c) } @@ -226,7 +393,7 @@ func (fn batchingOptionFunc) apply(c batchingConfig) batchingConfig { // passed, 2048 will be used. // The default value is also used when the provided value is less than one. func WithMaxQueueSize(size int) BatchProcessorOption { - return batchingOptionFunc(func(cfg batchingConfig) batchingConfig { + return batchOptionFunc(func(cfg batchConfig) batchConfig { cfg.maxQSize = newSetting(size) return cfg }) @@ -241,7 +408,7 @@ func WithMaxQueueSize(size int) BatchProcessorOption { // passed, 1s will be used. // The default value is also used when the provided value is less than one. func WithExportInterval(d time.Duration) BatchProcessorOption { - return batchingOptionFunc(func(cfg batchingConfig) batchingConfig { + return batchOptionFunc(func(cfg batchConfig) batchConfig { cfg.expInterval = newSetting(d) return cfg }) @@ -256,7 +423,7 @@ func WithExportInterval(d time.Duration) BatchProcessorOption { // passed, 30s will be used. // The default value is also used when the provided value is less than one. func WithExportTimeout(d time.Duration) BatchProcessorOption { - return batchingOptionFunc(func(cfg batchingConfig) batchingConfig { + return batchOptionFunc(func(cfg batchConfig) batchConfig { cfg.expTimeout = newSetting(d) return cfg }) @@ -272,7 +439,7 @@ func WithExportTimeout(d time.Duration) BatchProcessorOption { // passed, 512 will be used. // The default value is also used when the provided value is less than one. func WithExportMaxBatchSize(size int) BatchProcessorOption { - return batchingOptionFunc(func(cfg batchingConfig) batchingConfig { + return batchOptionFunc(func(cfg batchConfig) batchConfig { cfg.expMaxBatchSize = newSetting(size) return cfg }) diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 7e44b1903f3..539f04d6070 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -4,6 +4,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( + "context" "slices" "strconv" "sync" @@ -17,7 +18,7 @@ import ( "go.opentelemetry.io/otel/log" ) -func TestNewBatchingConfig(t *testing.T) { +func TestNewBatchConfig(t *testing.T) { otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { t.Log(err) })) @@ -26,11 +27,11 @@ func TestNewBatchingConfig(t *testing.T) { name string envars map[string]string options []BatchProcessorOption - want batchingConfig + want batchConfig }{ { name: "Defaults", - want: batchingConfig{ + want: batchConfig{ maxQSize: newSetting(dfltMaxQSize), expInterval: newSetting(dfltExpInterval), expTimeout: newSetting(dfltExpTimeout), @@ -45,7 +46,7 @@ func TestNewBatchingConfig(t *testing.T) { WithExportTimeout(time.Hour), WithExportMaxBatchSize(2), }, - want: batchingConfig{ + want: batchConfig{ maxQSize: newSetting(10), expInterval: newSetting(time.Microsecond), expTimeout: newSetting(time.Hour), @@ -60,7 +61,7 @@ func TestNewBatchingConfig(t *testing.T) { envarExpTimeout: strconv.Itoa(1000), envarExpMaxBatchSize: strconv.Itoa(1), }, - want: batchingConfig{ + want: batchConfig{ maxQSize: newSetting(10), expInterval: newSetting(100 * time.Millisecond), expTimeout: newSetting(1000 * time.Millisecond), @@ -75,7 +76,7 @@ func TestNewBatchingConfig(t *testing.T) { WithExportTimeout(-1 * time.Hour), WithExportMaxBatchSize(-2), }, - want: batchingConfig{ + want: batchConfig{ maxQSize: newSetting(dfltMaxQSize), expInterval: newSetting(dfltExpInterval), expTimeout: newSetting(dfltExpTimeout), @@ -90,7 +91,7 @@ func TestNewBatchingConfig(t *testing.T) { envarExpTimeout: "-1", envarExpMaxBatchSize: "-1", }, - want: batchingConfig{ + want: batchConfig{ maxQSize: newSetting(dfltMaxQSize), expInterval: newSetting(dfltExpInterval), expTimeout: newSetting(dfltExpTimeout), @@ -112,7 +113,7 @@ func TestNewBatchingConfig(t *testing.T) { WithExportTimeout(time.Hour), WithExportMaxBatchSize(2), }, - want: batchingConfig{ + want: batchConfig{ maxQSize: newSetting(3), expInterval: newSetting(time.Microsecond), expTimeout: newSetting(time.Hour), @@ -125,7 +126,7 @@ func TestNewBatchingConfig(t *testing.T) { WithMaxQueueSize(1), WithExportMaxBatchSize(10), }, - want: batchingConfig{ + want: batchConfig{ maxQSize: newSetting(1), expInterval: newSetting(dfltExpInterval), expTimeout: newSetting(dfltExpTimeout), @@ -139,11 +140,306 @@ func TestNewBatchingConfig(t *testing.T) { for key, value := range tc.envars { t.Setenv(key, value) } - assert.Equal(t, tc.want, newBatchingConfig(tc.options)) + assert.Equal(t, tc.want, newBatchConfig(tc.options)) }) } } +func TestBatchProcessor(t *testing.T) { + ctx := context.Background() + + t.Run("NilExporter", func(t *testing.T) { + assert.NotPanics(t, func() { NewBatchProcessor(nil) }) + }) + + t.Run("Polling", func(t *testing.T) { + e := newTestExporter(nil) + const size = 15 + b := NewBatchProcessor( + e, + WithMaxQueueSize(2*size), + WithExportMaxBatchSize(2*size), + WithExportInterval(time.Nanosecond), + WithExportTimeout(time.Hour), + ) + for _, r := range make([]Record, size) { + assert.NoError(t, b.OnEmit(ctx, r)) + } + var got []Record + assert.Eventually(t, func() bool { + for _, r := range e.Records() { + got = append(got, r...) + } + return len(got) == size + }, 2*time.Second, time.Microsecond) + _ = b.Shutdown(ctx) + }) + + t.Run("OnEmit", func(t *testing.T) { + const batch = 10 + e := newTestExporter(nil) + b := NewBatchProcessor( + e, + WithMaxQueueSize(10*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + for _, r := range make([]Record, 10*batch) { + assert.NoError(t, b.OnEmit(ctx, r)) + } + assert.Eventually(t, func() bool { + return e.ExportN() > 1 + }, 2*time.Second, time.Microsecond, "multi-batch flush") + + assert.NoError(t, b.Shutdown(ctx)) + assert.GreaterOrEqual(t, e.ExportN(), 10) + }) + + t.Run("RetriggerFlushNonBlocking", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + const batch = 10 + b := NewBatchProcessor( + e, + WithMaxQueueSize(3*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + for _, r := range make([]Record, 2*batch) { + assert.NoError(t, b.OnEmit(ctx, r)) + } + + var n int + require.Eventually(t, func() bool { + n = e.ExportN() + return n > 0 + }, 2*time.Second, time.Microsecond, "blocked export not attempted") + + var err error + require.Eventually(t, func() bool { + err = b.OnEmit(ctx, Record{}) + return true + }, time.Second, time.Microsecond, "OnEmit blocked") + assert.NoError(t, err) + + e.ExportTrigger <- struct{}{} + assert.Eventually(t, func() bool { + return e.ExportN() > n + }, 2*time.Second, time.Microsecond, "flush not retriggered") + + close(e.ExportTrigger) + assert.NoError(t, b.Shutdown(ctx)) + assert.Equal(t, 3, e.ExportN()) + }) + + t.Run("Enabled", func(t *testing.T) { + b := NewBatchProcessor(defaultNoopExporter) + assert.True(t, b.Enabled(ctx, Record{})) + + _ = b.Shutdown(ctx) + assert.False(t, b.Enabled(ctx, Record{})) + }) + + t.Run("Shutdown", func(t *testing.T) { + t.Run("Error", func(t *testing.T) { + e := newTestExporter(assert.AnError) + b := NewBatchProcessor(e) + assert.ErrorIs(t, b.Shutdown(ctx), assert.AnError, "exporter error not returned") + assert.NoError(t, b.Shutdown(ctx)) + }) + + t.Run("Multiple", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchProcessor(e) + + const shutdowns = 3 + for i := 0; i < shutdowns; i++ { + assert.NoError(t, b.Shutdown(ctx)) + } + assert.Equal(t, 1, e.ShutdownN(), "exporter Shutdown calls") + }) + + t.Run("OnEmit", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchProcessor(e) + assert.NoError(t, b.Shutdown(ctx)) + + want := e.ExportN() + assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.Equal(t, want, e.ExportN(), "Export called after shutdown") + }) + + t.Run("ForceFlush", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchProcessor(e) + + assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.Shutdown(ctx)) + + assert.NoError(t, b.ForceFlush(ctx)) + assert.Equal(t, 0, e.ForceFlushN(), "ForceFlush called after shutdown") + }) + + t.Run("CanceledContext", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + t.Cleanup(func() { close(e.ExportTrigger) }) + b := NewBatchProcessor(e) + + ctx := context.Background() + c, cancel := context.WithCancel(ctx) + cancel() + + assert.ErrorIs(t, b.Shutdown(c), context.Canceled) + }) + }) + + t.Run("ForceFlush", func(t *testing.T) { + t.Run("Flush", func(t *testing.T) { + e := newTestExporter(assert.AnError) + b := NewBatchProcessor( + e, + WithMaxQueueSize(100), + WithExportMaxBatchSize(10), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + + var r Record + r.SetBody(log.BoolValue(true)) + require.NoError(t, b.OnEmit(ctx, r)) + + assert.ErrorIs(t, b.ForceFlush(ctx), assert.AnError, "exporter error not returned") + assert.Equal(t, 1, e.ForceFlushN(), "exporter ForceFlush calls") + if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") { + got := e.Records() + if assert.Len(t, got[0], 1, "records received") { + assert.Equal(t, r, got[0][0]) + } + } + }) + + t.Run("ErrorPartialFlush", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + ctxErrCalled := make(chan struct{}) + orig := ctxErr + ctxErr = func(ctx context.Context) error { + close(ctxErrCalled) + return orig(ctx) + } + t.Cleanup(func() { ctxErr = orig }) + + const batch = 1 + b := NewBatchProcessor( + e, + WithMaxQueueSize(10*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + + // Enqueue 10 x "batch size" amount of records. + for i := 0; i < 10*batch; i++ { + require.NoError(t, b.OnEmit(ctx, Record{})) + } + assert.Eventually(t, func() bool { + return e.ExportN() > 0 + }, 2*time.Second, time.Microsecond) + // 1 export being performed, 1 export in buffer chan, >1 batch + // still in queue that an attempt to flush will be made on. + // + // Stop the poll routine to prevent contention with the queue lock. + // This is outside of "normal" operations, but we are testing if + // ForceFlush will return the correct error when an EnqueueExport + // fails and not if ForceFlush will ever get the queue lock in high + // throughput situations. + close(b.pollDone) + <-b.pollDone + + // Cancel the flush ctx from the start so errPartialFlush is + // returned right away. + fCtx, cancel := context.WithCancel(ctx) + cancel() + + errCh := make(chan error, 1) + go func() { + errCh <- b.ForceFlush(fCtx) + close(errCh) + }() + // Wait for ctxErrCalled to close before closing ExportTrigger so + // we know the errPartialFlush will be returned in ForceFlush. + <-ctxErrCalled + close(e.ExportTrigger) + + err := <-errCh + assert.ErrorIs(t, err, errPartialFlush, "partial flush error") + assert.ErrorIs(t, err, context.Canceled, "ctx canceled error") + }) + + t.Run("CanceledContext", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + b := NewBatchProcessor(e) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + + var r Record + r.SetBody(log.BoolValue(true)) + _ = b.OnEmit(ctx, r) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + t.Cleanup(func() { close(e.ExportTrigger) }) + + c, cancel := context.WithCancel(ctx) + cancel() + assert.ErrorIs(t, b.ForceFlush(c), context.Canceled) + }) + }) + + t.Run("ConcurrentSafe", func(t *testing.T) { + const goRoutines = 10 + + e := newTestExporter(nil) + b := NewBatchProcessor(e) + + ctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + for i := 0; i < goRoutines-1; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + assert.NoError(t, b.OnEmit(ctx, Record{})) + // Ignore partial flush errors. + _ = b.ForceFlush(ctx) + } + } + }() + } + + require.Eventually(t, func() bool { + return e.ExportN() > 0 + }, 2*time.Second, time.Microsecond, "export before shutdown") + + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, b.Shutdown(ctx)) + cancel() + }() + + wg.Wait() + }) +} + func TestQueue(t *testing.T) { var r Record r.SetBody(log.BoolValue(true)) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 6ebf061db48..e9f2140e6cf 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -219,14 +219,33 @@ func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan } // EnqueueExport enqueues an export of records in the context of ctx to be -// performed asynchronously. This will return true if the exported is -// successfully enqueued, false otherwise. +// performed asynchronously. This will return true if the records are +// successfully enqueued (or the bufferExporter is shut down), false otherwise. +// +// The passed records are held after this call returns. func (e *bufferExporter) EnqueueExport(records []Record) bool { if len(records) == 0 { // Nothing to enqueue, do not waste input space. return true } - return e.enqueue(context.Background(), records, nil) == nil + + data := exportData{ctx: context.Background(), records: records} + + e.inputMu.Lock() + defer e.inputMu.Unlock() + + // Check stopped before enqueueing now that e.inputMu is held. This + // prevents sends on a closed chan when Shutdown is called concurrently. + if e.stopped.Load() { + return true + } + + select { + case e.input <- data: + return true + default: + return false + } } // Export synchronously exports records in the context of ctx. This will not diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 85c12860409..264abc3a513 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -583,7 +583,7 @@ func TestBufferExporter(t *testing.T) { e := newBufferExporter(exp, 1) _ = e.Shutdown(context.Background()) - assert.False(t, e.EnqueueExport(make([]Record, 1))) + assert.True(t, e.EnqueueExport(make([]Record, 1))) }) }) } diff --git a/sdk/log/provider.go b/sdk/log/provider.go index a51994d5a76..a7b53500e4b 100644 --- a/sdk/log/provider.go +++ b/sdk/log/provider.go @@ -192,7 +192,7 @@ func WithResource(res *resource.Resource) LoggerProviderOption { // Each WithProcessor creates a separate pipeline. Use custom decorators // for advanced scenarios such as enriching with attributes. // -// For production, use [NewBatchingProcessor] to batch log records before they are exported. +// For production, use [NewBatchProcessor] to batch log records before they are exported. // For testing and debugging, use [NewSimpleProcessor] to synchronously export log records. func WithProcessor(processor Processor) LoggerProviderOption { return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig { diff --git a/sdk/log/simple.go b/sdk/log/simple.go index 7db02906d60..c7aa14b8706 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -20,7 +20,7 @@ type SimpleProcessor struct { // This Processor is not recommended for production use. The synchronous // nature of this Processor make it good for testing, debugging, or // showing examples of other features, but it can be slow and have a high -// computation resource usage overhead. [NewBatchingProcessor] is recommended +// computation resource usage overhead. [NewBatchProcessor] is recommended // for production use instead. func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor { if exporter == nil {