From ba5d1268082fcc4ccb8fa967eb5133fa23e4f2ba Mon Sep 17 00:00:00 2001 From: Damien Mathieu Date: Thu, 28 Mar 2024 09:50:27 +0100 Subject: [PATCH 01/11] Fix benchmarks action (#5110) --- .github/workflows/benchmark.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index c392312ada0..ca9a079d0c0 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -35,6 +35,5 @@ jobs: external-data-json-path: ./benchmarks/data.json github-token: ${{ secrets.GITHUB_TOKEN }} gh-pages-branch: benchmarks - auto-push: true - fail-on-alert: false + fail-on-alert: true alert-threshold: "400%" From 9bffaf911830925779866c29d4e6cf0ae208ee3e Mon Sep 17 00:00:00 2001 From: OpenTelemetry Bot <107717825+opentelemetrybot@users.noreply.github.com> Date: Sun, 31 Mar 2024 13:06:54 +0200 Subject: [PATCH 02/11] dependabot updates Sun Mar 31 00:41:36 UTC 2024 (#5123) build(deps): bump github.com/cenkalti/backoff/v4 from 4.2.1 to 4.3.0 in /exporters/otlp/otlpmetric/otlpmetricgrpc build(deps): bump github.com/cenkalti/backoff/v4 from 4.2.1 to 4.3.0 in /exporters/otlp/otlptrace/otlptracehttp --- example/otel-collector/go.mod | 2 +- example/otel-collector/go.sum | 4 ++-- exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod | 2 +- exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum | 4 ++-- exporters/otlp/otlpmetric/otlpmetrichttp/go.mod | 2 +- exporters/otlp/otlpmetric/otlpmetrichttp/go.sum | 4 ++-- exporters/otlp/otlptrace/otlptracegrpc/go.mod | 2 +- exporters/otlp/otlptrace/otlptracegrpc/go.sum | 4 ++-- exporters/otlp/otlptrace/otlptracehttp/go.mod | 2 +- exporters/otlp/otlptrace/otlptracehttp/go.sum | 4 ++-- 10 files changed, 15 insertions(+), 15 deletions(-) diff --git a/example/otel-collector/go.mod b/example/otel-collector/go.mod index 734bf3feb04..5939e543251 100644 --- a/example/otel-collector/go.mod +++ b/example/otel-collector/go.mod @@ -16,7 +16,7 @@ require ( ) require ( - github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.3 // indirect diff --git a/example/otel-collector/go.sum b/example/otel-collector/go.sum index d55df377732..e311b64812b 100644 --- a/example/otel-collector/go.sum +++ b/example/otel-collector/go.sum @@ -1,5 +1,5 @@ -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod index 47d752d9355..344b5e5008d 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod @@ -5,7 +5,7 @@ go 1.21 retract v0.32.2 // Contains unresolvable dependencies. require ( - github.com/cenkalti/backoff/v4 v4.2.1 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/google/go-cmp v0.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.24.0 diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum index 3282f6f0ccf..66bea4ce5ad 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum @@ -1,5 +1,5 @@ -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod index 1d35f3b54b7..b2fdae8974c 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod @@ -5,7 +5,7 @@ go 1.21 retract v0.32.2 // Contains unresolvable dependencies. require ( - github.com/cenkalti/backoff/v4 v4.2.1 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/google/go-cmp v0.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.24.0 diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum b/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum index 3282f6f0ccf..66bea4ce5ad 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum @@ -1,5 +1,5 @@ -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/exporters/otlp/otlptrace/otlptracegrpc/go.mod b/exporters/otlp/otlptrace/otlptracegrpc/go.mod index 3ee7bae8e60..c087652c45b 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/go.mod +++ b/exporters/otlp/otlptrace/otlptracegrpc/go.mod @@ -3,7 +3,7 @@ module go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc go 1.21 require ( - github.com/cenkalti/backoff/v4 v4.2.1 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 diff --git a/exporters/otlp/otlptrace/otlptracegrpc/go.sum b/exporters/otlp/otlptrace/otlptracegrpc/go.sum index 0a51e7e9a60..9aecdf4f038 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/go.sum +++ b/exporters/otlp/otlptrace/otlptracegrpc/go.sum @@ -1,5 +1,5 @@ -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/exporters/otlp/otlptrace/otlptracehttp/go.mod b/exporters/otlp/otlptrace/otlptracehttp/go.mod index a3d6a6084d0..4e2e875ad69 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/go.mod +++ b/exporters/otlp/otlptrace/otlptracehttp/go.mod @@ -3,7 +3,7 @@ module go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp go 1.21 require ( - github.com/cenkalti/backoff/v4 v4.2.1 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 diff --git a/exporters/otlp/otlptrace/otlptracehttp/go.sum b/exporters/otlp/otlptrace/otlptracehttp/go.sum index 04a357bd0a9..7113a3c58be 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/go.sum +++ b/exporters/otlp/otlptrace/otlptracehttp/go.sum @@ -1,5 +1,5 @@ -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= From 7667f7ba2580f7b822d6652c09685f9fac5046b5 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 1 Apr 2024 10:30:34 -0400 Subject: [PATCH 03/11] Add support for AddLink to the OpenCensus bridge (#5116) * add support for AddLink to the OpenCensus bridge * Update CHANGELOG.md Co-authored-by: Tyler Yahn --------- Co-authored-by: Tyler Yahn --- CHANGELOG.md | 1 + bridge/opencensus/doc.go | 2 - .../opencensus/internal/oc2otel/attributes.go | 11 +++++ .../internal/oc2otel/attributes_test.go | 23 +++++++++ bridge/opencensus/internal/span.go | 14 +++++- bridge/opencensus/internal/span_test.go | 47 +++++++++++++++++-- 6 files changed, 90 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 59d54dcca22..c6a9a9c1aad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm At which point, users will be required to migrage their code, and this package will be deprecated then removed. (#5085) - Add support for `Summary` metrics in the `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` exporters. (#5100) - Add `otel.scope.name` and `otel.scope.version` tags to spans exported by `go.opentelemetry.io/otel/exporters/zipkin`. (#5108) +- Add support for `AddLink` to `go.opentelemetry.io/otel/bridge/opencensus`. (#5116) ### Changed diff --git a/bridge/opencensus/doc.go b/bridge/opencensus/doc.go index 8d363ce4dbc..0f5d4abb8cf 100644 --- a/bridge/opencensus/doc.go +++ b/bridge/opencensus/doc.go @@ -37,8 +37,6 @@ // // There are known limitations to the trace bridge: // -// - The AddLink method for OpenCensus Spans is ignored, and an error is sent -// to the OpenTelemetry ErrorHandler. // - The NewContext method of the OpenCensus Tracer cannot embed an OpenCensus // Span in a context unless that Span was created by that Tracer. // - Conversion of custom OpenCensus Samplers to OpenTelemetry is not diff --git a/bridge/opencensus/internal/oc2otel/attributes.go b/bridge/opencensus/internal/oc2otel/attributes.go index 1b9e931e073..7c6ae45d583 100644 --- a/bridge/opencensus/internal/oc2otel/attributes.go +++ b/bridge/opencensus/internal/oc2otel/attributes.go @@ -20,6 +20,17 @@ func Attributes(attr []octrace.Attribute) []attribute.KeyValue { return otelAttr } +func AttributesFromMap(attr map[string]interface{}) []attribute.KeyValue { + otelAttr := make([]attribute.KeyValue, 0, len(attr)) + for k, v := range attr { + otelAttr = append(otelAttr, attribute.KeyValue{ + Key: attribute.Key(k), + Value: AttributeValue(v), + }) + } + return otelAttr +} + func AttributeValue(ocval interface{}) attribute.Value { switch v := ocval.(type) { case bool: diff --git a/bridge/opencensus/internal/oc2otel/attributes_test.go b/bridge/opencensus/internal/oc2otel/attributes_test.go index 8c1447341fd..7e3efeed723 100644 --- a/bridge/opencensus/internal/oc2otel/attributes_test.go +++ b/bridge/opencensus/internal/oc2otel/attributes_test.go @@ -37,6 +37,29 @@ func TestAttributes(t *testing.T) { } } +func TestAttributesFromMap(t *testing.T) { + in := map[string]interface{}{ + "bool": true, + "int64": int64(49), + "float64": float64(1.618), + "key": "val", + } + + want := []attribute.KeyValue{ + attribute.Bool("bool", true), + attribute.Int64("int64", 49), + attribute.Float64("float64", 1.618), + attribute.String("key", "val"), + } + got := AttributesFromMap(in) + + gotAttributeSet := attribute.NewSet(got...) + wantAttributeSet := attribute.NewSet(want...) + if !gotAttributeSet.Equals(&wantAttributeSet) { + t.Errorf("Attributes conversion want %v, got %v", wantAttributeSet.Encoded(attribute.DefaultEncoder()), gotAttributeSet.Encoded(attribute.DefaultEncoder())) + } +} + func TestAttributeValueUnknown(t *testing.T) { got := AttributeValue([]byte{}) if got != attribute.StringValue("unknown") { diff --git a/bridge/opencensus/internal/span.go b/bridge/opencensus/internal/span.go index e3b76064bfc..9e7ee39fb5b 100644 --- a/bridge/opencensus/internal/span.go +++ b/bridge/opencensus/internal/span.go @@ -110,8 +110,20 @@ func (s *Span) AddMessageReceiveEvent(messageID, uncompressedByteSize, compresse } // AddLink adds a link to this span. +// This drops the OpenCensus LinkType because there is no such concept in OpenTelemetry. func (s *Span) AddLink(l octrace.Link) { - Handle(fmt.Errorf("ignoring OpenCensus link %+v for span %q because OpenTelemetry doesn't support setting links after creation", l, s.String())) + s.otelSpan.AddLink(trace.Link{ + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID(l.TraceID), + SpanID: trace.SpanID(l.SpanID), + // We don't know if this was sampled or not. + // Mark it as sampled, since sampled means + // "the caller may have recorded trace data": + // https://www.w3.org/TR/trace-context/#sampled-flag + TraceFlags: trace.FlagsSampled, + }), + Attributes: oc2otel.AttributesFromMap(l.Attributes), + }) } // String prints a string representation of this span. diff --git a/bridge/opencensus/internal/span_test.go b/bridge/opencensus/internal/span_test.go index e11632e0252..949018b8f0c 100644 --- a/bridge/opencensus/internal/span_test.go +++ b/bridge/opencensus/internal/span_test.go @@ -28,6 +28,7 @@ type span struct { attrs []attribute.KeyValue eName string eOpts []trace.EventOption + links []trace.Link } func (s *span) IsRecording() bool { return s.recording } @@ -37,6 +38,7 @@ func (s *span) SetName(n string) { s.name = n } func (s *span) SetStatus(c codes.Code, d string) { s.sCode, s.sMsg = c, d } func (s *span) SetAttributes(a ...attribute.KeyValue) { s.attrs = a } func (s *span) AddEvent(n string, o ...trace.EventOption) { s.eName, s.eOpts = n, o } +func (s *span) AddLink(l trace.Link) { s.links = append(s.links, l) } func TestSpanIsRecordingEvents(t *testing.T) { s := &span{recording: true} @@ -230,16 +232,51 @@ func TestSpanAddMessageReceiveEvent(t *testing.T) { } func TestSpanAddLinkFails(t *testing.T) { - h, restore := withHandler() - defer restore() - // OpenCensus does not try to set links if not recording. s := &span{recording: true} ocS := internal.NewSpan(s) ocS.AddLink(octrace.Link{}) + ocS.AddLink(octrace.Link{ + TraceID: octrace.TraceID([16]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}), + SpanID: octrace.SpanID([8]byte{2, 0, 0, 0, 0, 0, 0, 0}), + Attributes: map[string]interface{}{ + "foo": "bar", + "number": int64(3), + }, + }) + + wantLinks := []trace.Link{ + { + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceFlags: trace.FlagsSampled, + }), + }, + { + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID([]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}), + SpanID: trace.SpanID([]byte{2, 0, 0, 0, 0, 0, 0, 0}), + TraceFlags: trace.FlagsSampled, + }), + Attributes: []attribute.KeyValue{ + attribute.String("foo", "bar"), + attribute.Int64("number", 3), + }, + }, + } + + if len(s.links) != len(wantLinks) { + t.Fatalf("got wrong number of links; want %v, got %v", len(wantLinks), len(s.links)) + } - if h.err == nil { - t.Error("span.AddLink failed to raise an error") + for i, l := range s.links { + if !l.SpanContext.Equal(wantLinks[i].SpanContext) { + t.Errorf("link[%v] has the wrong span context; want %+v, got %+v", i, wantLinks[i].SpanContext, l.SpanContext) + } + gotAttributeSet := attribute.NewSet(l.Attributes...) + wantAttributeSet := attribute.NewSet(wantLinks[i].Attributes...) + if !gotAttributeSet.Equals(&wantAttributeSet) { + t.Errorf("link[%v] has the wrong attributes; want %v, got %v", i, wantAttributeSet.Encoded(attribute.DefaultEncoder()), gotAttributeSet.Encoded(attribute.DefaultEncoder())) + } } } From bddfbc68caa3101a11e02aa4de6f49fc31880136 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 1 Apr 2024 09:17:07 -0700 Subject: [PATCH 04/11] Add timeoutExporter (#5118) --- sdk/log/exporter.go | 27 +++++++++++++++++++++++ sdk/log/exporter_test.go | 47 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index dff4dc9c28d..9f85f8a1fd9 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -5,6 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" + "time" "go.opentelemetry.io/otel" ) @@ -53,6 +54,32 @@ func (noopExporter) Shutdown(context.Context) error { return nil } func (noopExporter) ForceFlush(context.Context) error { return nil } +// timeoutExporter wraps an Exporter and ensures any call to Export will have a +// timeout for the context. +type timeoutExporter struct { + Exporter + + // timeout is the maximum time an export is attempted. + timeout time.Duration +} + +// newTimeoutExporter wraps exporter with an Exporter that limits the context +// lifetime passed to Export to be timeout. If timeout is less than or equal to +// zero, exporter will be returned directly. +func newTimeoutExporter(exp Exporter, timeout time.Duration) Exporter { + if timeout <= 0 { + return exp + } + return &timeoutExporter{Exporter: exp, timeout: timeout} +} + +// Export sets the timeout of ctx before calling the Exporter e wraps. +func (e *timeoutExporter) Export(ctx context.Context, records []Record) error { + ctx, cancel := context.WithTimeout(ctx, e.timeout) + defer cancel() + return e.Exporter.Export(ctx, records) +} + // exportSync exports all data from input using exporter in a spawned // goroutine. The returned chan will be closed when the spawned goroutine // completes. diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 4eb2056d1b2..3c37b83ad38 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -25,6 +25,9 @@ type instruction struct { type testExporter struct { // Err is the error returned by all methods of the testExporter. Err error + // ExportTrigger is read from prior to returning from the Export method if + // non-nil. + ExportTrigger chan struct{} // Counts of method calls. exportN, shutdownN, forceFlushN *int32 @@ -74,6 +77,13 @@ func (e *testExporter) Records() [][]Record { func (e *testExporter) Export(ctx context.Context, r []Record) error { atomic.AddInt32(e.exportN, 1) + if e.ExportTrigger != nil { + select { + case <-e.ExportTrigger: + case <-ctx.Done(): + return ctx.Err() + } + } e.input <- instruction{Record: &r} return e.Err } @@ -196,3 +206,40 @@ func TestExportSync(t *testing.T) { assert.ElementsMatch(t, want, got, "record bodies") }) } + +func TestTimeoutExporter(t *testing.T) { + t.Run("ZeroTimeout", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newTimeoutExporter(exp, 0) + assert.Same(t, exp, e) + }) + + t.Run("Timeout", func(t *testing.T) { + trigger := make(chan struct{}) + t.Cleanup(func() { close(trigger) }) + + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + exp.ExportTrigger = trigger + e := newTimeoutExporter(exp, time.Nanosecond) + + out := make(chan error, 1) + go func() { + out <- e.Export(context.Background(), make([]Record, 1)) + }() + + var err error + assert.Eventually(t, func() bool { + select { + case err = <-out: + return true + default: + return false + } + }, 2*time.Second, time.Microsecond) + + assert.ErrorIs(t, err, context.DeadlineExceeded) + close(out) + }) +} From 2f73208044a4c709fd7b9feaeff3d68f40f2b681 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Mon, 1 Apr 2024 09:35:55 -0700 Subject: [PATCH 05/11] Fix spelling errors in baggage.go (#5120) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert PajÄ…k Co-authored-by: Tyler Yahn --- baggage/baggage.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/baggage/baggage.go b/baggage/baggage.go index f3e870f8a8e..94285d95935 100644 --- a/baggage/baggage.go +++ b/baggage/baggage.go @@ -56,10 +56,10 @@ func NewKeyProperty(key string) (Property, error) { // NewKeyValueProperty returns a new Property for key with value. // // The passed key must be compliant with W3C Baggage specification. -// The passed value must be precent-encoded as defined in W3C Baggage specification. +// The passed value must be percent-encoded as defined in W3C Baggage specification. // // Notice: Consider using [NewKeyValuePropertyRaw] instead -// that does not require precent-encoding of the value. +// that does not require percent-encoding of the value. func NewKeyValueProperty(key, value string) (Property, error) { if !validateValue(value) { return newInvalidProperty(), fmt.Errorf("%w: %q", errInvalidValue, value) @@ -224,10 +224,10 @@ type Member struct { // NewMemberRaw returns a new Member from the passed arguments. // // The passed key must be compliant with W3C Baggage specification. -// The passed value must be precent-encoded as defined in W3C Baggage specification. +// The passed value must be percent-encoded as defined in W3C Baggage specification. // // Notice: Consider using [NewMemberRaw] instead -// that does not require precent-encoding of the value. +// that does not require percent-encoding of the value. func NewMember(key, value string, props ...Property) (Member, error) { if !validateValue(value) { return newInvalidMember(), fmt.Errorf("%w: %q", errInvalidValue, value) @@ -298,7 +298,7 @@ func parseMember(member string) (Member, error) { return newInvalidMember(), fmt.Errorf("%w: %q", errInvalidValue, v) } - // Decode a precent-encoded value. + // Decode a percent-encoded value. value, err := url.PathUnescape(val) if err != nil { return newInvalidMember(), fmt.Errorf("%w: %v", errInvalidValue, err) @@ -605,7 +605,7 @@ func parsePropertyInternal(s string) (p Property, ok bool) { return } - // Decode a precent-encoded value. + // Decode a percent-encoded value. value, err := url.PathUnescape(s[valueStart:valueEnd]) if err != nil { return From a8e4263232f0b549c246bc98c114a73f1d1b98ee Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 1 Apr 2024 21:11:32 +0200 Subject: [PATCH 06/11] build(deps): bump codecov/codecov-action from 4.1.0 to 4.1.1 (#5125) Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4.1.0 to 4.1.1. - [Release notes](https://github.com/codecov/codecov-action/releases) - [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/codecov/codecov-action/compare/v4.1.0...v4.1.1) --- updated-dependencies: - dependency-name: codecov/codecov-action dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Tyler Yahn --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 839bcdb2939..6f98628fb5b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -92,7 +92,7 @@ jobs: cp coverage.txt $TEST_RESULTS cp coverage.html $TEST_RESULTS - name: Upload coverage report - uses: codecov/codecov-action@v4.1.0 + uses: codecov/codecov-action@v4.1.1 with: file: ./coverage.txt fail_ci_if_error: true From b7fdeb9f3ab912e2000411473ad555ea99c526bf Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 1 Apr 2024 14:05:35 -0700 Subject: [PATCH 07/11] build(deps): bump github.com/golangci/golangci-lint in /internal/tools (#5126) Bumps [github.com/golangci/golangci-lint](https://github.com/golangci/golangci-lint) from 1.57.1 to 1.57.2. - [Release notes](https://github.com/golangci/golangci-lint/releases) - [Changelog](https://github.com/golangci/golangci-lint/blob/master/CHANGELOG.md) - [Commits](https://github.com/golangci/golangci-lint/compare/v1.57.1...v1.57.2) --- updated-dependencies: - dependency-name: github.com/golangci/golangci-lint dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Tyler Yahn --- internal/tools/go.mod | 14 +++++++------- internal/tools/go.sum | 28 ++++++++++++++-------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/internal/tools/go.mod b/internal/tools/go.mod index 40a8f340594..a99df180132 100644 --- a/internal/tools/go.mod +++ b/internal/tools/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/client9/misspell v0.3.4 github.com/gogo/protobuf v1.3.2 - github.com/golangci/golangci-lint v1.57.1 + github.com/golangci/golangci-lint v1.57.2 github.com/itchyny/gojq v0.12.14 github.com/jcchavezs/porto v0.6.0 github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad @@ -54,7 +54,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/charithe/durationcheck v0.0.10 // indirect github.com/chavacava/garif v0.1.0 // indirect - github.com/ckaznocha/intrange v0.1.0 // indirect + github.com/ckaznocha/intrange v0.1.1 // indirect github.com/cloudflare/circl v1.3.7 // indirect github.com/curioswitch/go-reassign v0.2.0 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect @@ -103,19 +103,19 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/itchyny/timefmt-go v0.1.5 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect - github.com/jgautheron/goconst v1.7.0 // indirect + github.com/jgautheron/goconst v1.7.1 // indirect github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect github.com/jjti/go-spancheck v0.5.3 // indirect github.com/julz/importas v0.1.0 // indirect - github.com/karamaru-alpha/copyloopvar v1.0.8 // indirect + github.com/karamaru-alpha/copyloopvar v1.0.10 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/kisielk/errcheck v1.7.0 // indirect - github.com/kkHAIKE/contextcheck v1.1.4 // indirect + github.com/kkHAIKE/contextcheck v1.1.5 // indirect github.com/kulti/thelper v0.6.3 // indirect github.com/kunwardeep/paralleltest v1.0.10 // indirect github.com/kyoh86/exportloopref v0.1.11 // indirect - github.com/ldez/gomoddirectives v0.2.3 // indirect + github.com/ldez/gomoddirectives v0.2.4 // indirect github.com/ldez/tagliatelle v0.5.0 // indirect github.com/leonklingele/grouper v1.1.1 // indirect github.com/lufeee/execinquery v1.2.1 // indirect @@ -134,7 +134,7 @@ require ( github.com/nakabonne/nestif v0.3.1 // indirect github.com/nishanths/exhaustive v0.12.0 // indirect github.com/nishanths/predeclared v0.2.2 // indirect - github.com/nunnatsa/ginkgolinter v0.16.1 // indirect + github.com/nunnatsa/ginkgolinter v0.16.2 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pelletier/go-toml/v2 v2.2.0 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect diff --git a/internal/tools/go.sum b/internal/tools/go.sum index d11341ddf29..972a2037403 100644 --- a/internal/tools/go.sum +++ b/internal/tools/go.sum @@ -124,8 +124,8 @@ github.com/chavacava/garif v0.1.0/go.mod h1:XMyYCkEL58DF0oyW4qDjjnPWONs2HBqYKI+U github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/ckaznocha/intrange v0.1.0 h1:ZiGBhvrdsKpoEfzh9CjBfDSZof6QB0ORY5tXasUtiew= -github.com/ckaznocha/intrange v0.1.0/go.mod h1:Vwa9Ekex2BrEQMg6zlrWwbs/FtYw7eS5838Q7UjK7TQ= +github.com/ckaznocha/intrange v0.1.1 h1:gHe4LfqCspWkh8KpJFs20fJz3XRHFBFUV9yI7Itu83Q= +github.com/ckaznocha/intrange v0.1.1/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8VhJZEEA5n+RE= github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= @@ -258,8 +258,8 @@ github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a h1:w8hkcTqaFpzKqonE9 github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a/go.mod h1:ryS0uhF+x9jgbj/N71xsEqODy9BN81/GonCZiOzirOk= github.com/golangci/gofmt v0.0.0-20231018234816-f50ced29576e h1:ULcKCDV1LOZPFxGZaA6TlQbiM3J2GCPnkx/bGF6sX/g= github.com/golangci/gofmt v0.0.0-20231018234816-f50ced29576e/go.mod h1:Pm5KhLPA8gSnQwrQ6ukebRcapGb/BG9iUkdaiCcGHJM= -github.com/golangci/golangci-lint v1.57.1 h1:cqhpzkzjDwdN12rfMf1SUyyKyp88a1SltNqEYGS0nJw= -github.com/golangci/golangci-lint v1.57.1/go.mod h1:zLcHhz3NHc88T5zV2j75lyc0zH3LdOPOybblYa4p0oI= +github.com/golangci/golangci-lint v1.57.2 h1:NNhxfZyL5He1WWDrIvl1a4n5bvWZBcgAqBwlJAAgLTw= +github.com/golangci/golangci-lint v1.57.2/go.mod h1:ApiG3S3Ca23QyfGp5BmsorTiVxJpr5jGiNS0BkdSidg= github.com/golangci/misspell v0.4.1 h1:+y73iSicVy2PqyX7kmUefHusENlrP9YwuHZHPLGQj/g= github.com/golangci/misspell v0.4.1/go.mod h1:9mAN1quEo3DlpbaIKKyEvRxK1pwqR9s/Sea1bJCtlNI= github.com/golangci/plugin-module-register v0.1.1 h1:TCmesur25LnyJkpsVrupv1Cdzo+2f7zX0H6Jkw1Ol6c= @@ -336,8 +336,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jcchavezs/porto v0.6.0 h1:AgQLGwsXaxDkPj4Y+paFkVGLAR4n/1RRF0xV5UKinwg= github.com/jcchavezs/porto v0.6.0/go.mod h1:fESH0gzDHiutHRdX2hv27ojnOVFco37hg1W6E9EZF4A= -github.com/jgautheron/goconst v1.7.0 h1:cEqH+YBKLsECnRSd4F4TK5ri8t/aXtt/qoL0Ft252B0= -github.com/jgautheron/goconst v1.7.0/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= +github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= +github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= github.com/jingyugao/rowserrcheck v1.1.1 h1:zibz55j/MJtLsjP1OF4bSdgXxwL1b+Vn7Tjzq7gFzUs= github.com/jingyugao/rowserrcheck v1.1.1/go.mod h1:4yvlZSDb3IyDTUZJUmpZfm2Hwok+Dtp+nu2qOq+er9c= github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af h1:KA9BjwUk7KlCh6S9EAGWBt1oExIUv9WyNCiRz5amv48= @@ -355,16 +355,16 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/julz/importas v0.1.0 h1:F78HnrsjY3cR7j0etXy5+TU1Zuy7Xt08X/1aJnH5xXY= github.com/julz/importas v0.1.0/go.mod h1:oSFU2R4XK/P7kNBrnL/FEQlDGN1/6WoxXEjSSXO0DV0= -github.com/karamaru-alpha/copyloopvar v1.0.8 h1:gieLARwuByhEMxRwM3GRS/juJqFbLraftXIKDDNJ50Q= -github.com/karamaru-alpha/copyloopvar v1.0.8/go.mod h1:u7CIfztblY0jZLOQZgH3oYsJzpC2A7S6u/lfgSXHy0k= +github.com/karamaru-alpha/copyloopvar v1.0.10 h1:8HYDy6KQYqTmD7JuhZMWS1nwPru9889XI24ROd/+WXI= +github.com/karamaru-alpha/copyloopvar v1.0.10/go.mod h1:u7CIfztblY0jZLOQZgH3oYsJzpC2A7S6u/lfgSXHy0k= github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/errcheck v1.7.0 h1:+SbscKmWJ5mOK/bO1zS60F5I9WwZDWOfRsC4RwfwRV0= github.com/kisielk/errcheck v1.7.0/go.mod h1:1kLL+jV4e+CFfueBmI1dSK2ADDyQnlrnrY/FqKluHJQ= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/kkHAIKE/contextcheck v1.1.4 h1:B6zAaLhOEEcjvUgIYEqystmnFk1Oemn8bvJhbt0GMb8= -github.com/kkHAIKE/contextcheck v1.1.4/go.mod h1:1+i/gWqokIa+dm31mqGLZhZJ7Uh44DJGZVmr6QRBNJg= +github.com/kkHAIKE/contextcheck v1.1.5 h1:CdnJh63tcDe53vG+RebdpdXJTc9atMgGqdx8LXxiilg= +github.com/kkHAIKE/contextcheck v1.1.5/go.mod h1:O930cpht4xb1YQpK+1+AgoM3mFsvxr7uyFptcnWTYUA= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -381,8 +381,8 @@ github.com/kunwardeep/paralleltest v1.0.10 h1:wrodoaKYzS2mdNVnc4/w31YaXFtsc21PCT github.com/kunwardeep/paralleltest v1.0.10/go.mod h1:2C7s65hONVqY7Q5Efj5aLzRCNLjw2h4eMc9EcypGjcY= github.com/kyoh86/exportloopref v0.1.11 h1:1Z0bcmTypkL3Q4k+IDHMWTcnCliEZcaPiIe0/ymEyhQ= github.com/kyoh86/exportloopref v0.1.11/go.mod h1:qkV4UF1zGl6EkF1ox8L5t9SwyeBAZ3qLMd6up458uqA= -github.com/ldez/gomoddirectives v0.2.3 h1:y7MBaisZVDYmKvt9/l1mjNCiSA1BVn34U0ObUcJwlhA= -github.com/ldez/gomoddirectives v0.2.3/go.mod h1:cpgBogWITnCfRq2qGoDkKMEVSaarhdBr6g8G04uz6d0= +github.com/ldez/gomoddirectives v0.2.4 h1:j3YjBIjEBbqZ0NKtBNzr8rtMHTOrLPeiwTkfUJZ3alg= +github.com/ldez/gomoddirectives v0.2.4/go.mod h1:oWu9i62VcQDYp9EQ0ONTfqLNh+mDLWWDO+SO0qSQw5g= github.com/ldez/tagliatelle v0.5.0 h1:epgfuYt9v0CG3fms0pEgIMNPuFf/LpPIfjk4kyqSioo= github.com/ldez/tagliatelle v0.5.0/go.mod h1:rj1HmWiL1MiKQuOONhd09iySTEkUuE/8+5jtPYz9xa4= github.com/leonklingele/grouper v1.1.1 h1:suWXRU57D4/Enn6pXR0QVqqWWrnJ9Osrz+5rjt8ivzU= @@ -431,8 +431,8 @@ github.com/nishanths/exhaustive v0.12.0 h1:vIY9sALmw6T/yxiASewa4TQcFsVYZQQRUQJhK github.com/nishanths/exhaustive v0.12.0/go.mod h1:mEZ95wPIZW+x8kC4TgC+9YCUgiST7ecevsVDTgc2obs= github.com/nishanths/predeclared v0.2.2 h1:V2EPdZPliZymNAn79T8RkNApBjMmVKh5XRpLm/w98Vk= github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c= -github.com/nunnatsa/ginkgolinter v0.16.1 h1:uDIPSxgVHZ7PgbJElRDGzymkXH+JaF7mjew+Thjnt6Q= -github.com/nunnatsa/ginkgolinter v0.16.1/go.mod h1:4tWRinDN1FeJgU+iJANW/kz7xKN5nYRAOfJDQUS9dOQ= +github.com/nunnatsa/ginkgolinter v0.16.2 h1:8iLqHIZvN4fTLDC0Ke9tbSZVcyVHoBs0HIbnVSxfHJk= +github.com/nunnatsa/ginkgolinter v0.16.2/go.mod h1:4tWRinDN1FeJgU+iJANW/kz7xKN5nYRAOfJDQUS9dOQ= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= From e6e44dee90ec354369e98bb32e176878d95fea63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Tue, 2 Apr 2024 10:50:07 +0200 Subject: [PATCH 08/11] log: Add String method to Value and KeyValue (#5117) --- CHANGELOG.md | 1 + log/keyvalue.go | 43 +++++++++++++++++++++++++++++++++++++++++++ log/keyvalue_test.go | 19 +++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c6a9a9c1aad..3a5af468a04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add support for `Summary` metrics in the `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` exporters. (#5100) - Add `otel.scope.name` and `otel.scope.version` tags to spans exported by `go.opentelemetry.io/otel/exporters/zipkin`. (#5108) - Add support for `AddLink` to `go.opentelemetry.io/otel/bridge/opencensus`. (#5116) +- Add `String` method to `Value` and `KeyValue` in `go.opentelemetry.io/otel/log`. (#5117) ### Changed diff --git a/log/keyvalue.go b/log/keyvalue.go index da3d55c4fc8..10920d21f4a 100644 --- a/log/keyvalue.go +++ b/log/keyvalue.go @@ -8,8 +8,10 @@ package log // import "go.opentelemetry.io/otel/log" import ( "bytes" "errors" + "fmt" "math" "slices" + "strconv" "unsafe" "go.opentelemetry.io/otel/internal/global" @@ -265,6 +267,39 @@ func (v Value) Equal(w Value) bool { } } +// String returns Value's value as a string, formatted like [fmt.Sprint]. +// +// The returned string is meant for debugging; +// the string representation is not stable. +func (v Value) String() string { + switch v.Kind() { + case KindString: + return v.asString() + case KindInt64: + return strconv.FormatInt(int64(v.num), 10) + case KindFloat64: + return strconv.FormatFloat(v.asFloat64(), 'g', -1, 64) + case KindBool: + return strconv.FormatBool(v.asBool()) + case KindBytes: + return fmt.Sprint(v.asBytes()) + case KindMap: + return fmt.Sprint(v.asMap()) + case KindSlice: + return fmt.Sprint(v.asSlice()) + case KindEmpty: + return "" + default: + // Try to handle this as gracefully as possible. + // + // Don't panic here. The goal here is to have developers find this + // first if a slog.Kind is is not handled. It is + // preferable to have user's open issue asking why their attributes + // have a "unhandled: " prefix than say that their code is panicking. + return fmt.Sprintf("", v.Kind()) + } +} + // A KeyValue is a key-value pair used to represent a log attribute (a // superset of [go.opentelemetry.io/otel/attribute.KeyValue]) and map item. type KeyValue struct { @@ -321,3 +356,11 @@ func Map(key string, value ...KeyValue) KeyValue { func Empty(key string) KeyValue { return KeyValue{key, Value{}} } + +// String returns key-value pair as a string, formatted like "key:value". +// +// The returned string is meant for debugging; +// the string representation is not stable. +func (a KeyValue) String() string { + return fmt.Sprintf("%s:%s", a.Key, a.Value) +} diff --git a/log/keyvalue_test.go b/log/keyvalue_test.go index f7c6602de74..2f0211160cf 100644 --- a/log/keyvalue_test.go +++ b/log/keyvalue_test.go @@ -264,6 +264,25 @@ func TestEmpty(t *testing.T) { t.Run("AsMap", testErrKind(v.AsMap, "AsMap", k)) } +func TestValueString(t *testing.T) { + for _, test := range []struct { + v log.Value + want string + }{ + {log.Int64Value(-3), "-3"}, + {log.Float64Value(.15), "0.15"}, + {log.BoolValue(true), "true"}, + {log.StringValue("foo"), "foo"}, + {log.BytesValue([]byte{2, 4, 6}), "[2 4 6]"}, + {log.SliceValue(log.IntValue(3), log.StringValue("foo")), "[3 foo]"}, + {log.MapValue(log.Int("a", 1), log.Bool("b", true)), "[a:1 b:true]"}, + {log.Value{}, ""}, + } { + got := test.v.String() + assert.Equal(t, test.want, got) + } +} + type logSink struct { logr.LogSink From c4dffbf88824956077883b900d6bb25dd7bf08e1 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 2 Apr 2024 07:44:38 -0700 Subject: [PATCH 09/11] Add chunkExporter (#5104) * Add chunker exporter The batching log processor needs to be able to export payloads in chuncks. This adds a chunker type that will forward all Shutdown and ForceFlush calls to the embedded exporter and chunk data passed to Export. * Concurrent safe testExporter * Add test for zero size * Fix lint * Refactor chunker into chunkExporter * Remove ExportTrigger --- sdk/log/exporter.go | 31 ++++++++++++++++++++ sdk/log/exporter_test.go | 61 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 9f85f8a1fd9..a73fc3bb718 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -54,6 +54,37 @@ func (noopExporter) Shutdown(context.Context) error { return nil } func (noopExporter) ForceFlush(context.Context) error { return nil } +// chunkExporter wraps an Exporter's Export method so it is called with +// appropriately sized export payloads. Any payload larger than a defined size +// is chunked into smaller payloads and exported sequentially. +type chunkExporter struct { + Exporter + + // size is the maximum batch size exported. + size int +} + +// newChunkExporter wraps exporter. Calls to the Export will have their records +// payload chuncked so they do not exceed size. If size is less than or equal +// to 0, exporter is returned directly. +func newChunkExporter(exporter Exporter, size int) Exporter { + if size <= 0 { + return exporter + } + return &chunkExporter{Exporter: exporter, size: size} +} + +// Export exports records in chuncks no larger than c.size. +func (c chunkExporter) Export(ctx context.Context, records []Record) error { + n := len(records) + for i, j := 0, min(c.size, n); i < n; i, j = i+c.size, min(j+c.size, n) { + if err := c.Exporter.Export(ctx, records[i:j]); err != nil { + return err + } + } + return nil +} + // timeoutExporter wraps an Exporter and ensures any call to Export will have a // timeout for the context. type timeoutExporter struct { diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 3c37b83ad38..6ae635fb7fe 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/log" @@ -115,6 +116,66 @@ func (e *testExporter) ForceFlushN() int { return int(atomic.LoadInt32(e.forceFlushN)) } +func TestChunker(t *testing.T) { + t.Run("ZeroSize", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + c := newChunkExporter(exp, 0) + const size = 100 + _ = c.Export(context.Background(), make([]Record, size)) + + assert.Equal(t, 1, exp.ExportN()) + records := exp.Records() + assert.Len(t, records, 1) + assert.Len(t, records[0], size) + }) + + t.Run("ForceFlush", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + c := newChunkExporter(exp, 0) + _ = c.ForceFlush(context.Background()) + assert.Equal(t, 1, exp.ForceFlushN(), "ForceFlush not passed through") + }) + + t.Run("Shutdown", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + c := newChunkExporter(exp, 0) + _ = c.Shutdown(context.Background()) + assert.Equal(t, 1, exp.ShutdownN(), "Shutdown not passed through") + }) + + t.Run("Chunk", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + c := newChunkExporter(exp, 10) + assert.NoError(t, c.Export(context.Background(), make([]Record, 5))) + assert.NoError(t, c.Export(context.Background(), make([]Record, 25))) + + wantLens := []int{5, 10, 10, 5} + records := exp.Records() + require.Len(t, records, len(wantLens), "chunks") + for i, n := range wantLens { + assert.Lenf(t, records[i], n, "chunk %d", i) + } + }) + + t.Run("ExportError", func(t *testing.T) { + exp := newTestExporter(assert.AnError) + t.Cleanup(exp.Stop) + c := newChunkExporter(exp, 0) + ctx := context.Background() + records := make([]Record, 25) + err := c.Export(ctx, records) + assert.ErrorIs(t, err, assert.AnError, "no chunking") + + c = newChunkExporter(exp, 10) + err = c.Export(ctx, records) + assert.ErrorIs(t, err, assert.AnError, "with chunking") + }) +} + func TestExportSync(t *testing.T) { eventuallyDone := func(t *testing.T, done chan struct{}) { assert.Eventually(t, func() bool { From 5449f083aa4af79313d5fe5fecd554af2cb7bbbc Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 2 Apr 2024 08:36:18 -0700 Subject: [PATCH 10/11] Add the `bufferExporter` (#5119) * Add the bufferExporter * Fix TestExportSync Reset default ErrorHandler * Comment * Clean up tests * Remove context arg from EnqueueExport * Join wrapped exporter error --- sdk/log/exporter.go | 133 ++++++++++++++++++ sdk/log/exporter_test.go | 289 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 419 insertions(+), 3 deletions(-) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index a73fc3bb718..e3c2cd91421 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -5,6 +5,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" + "errors" + "fmt" + "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel" @@ -158,3 +162,132 @@ func (e exportData) respond(err error) { } } } + +// bufferExporter provides asynchronous and synchronous export functionality by +// buffering export requests. +type bufferExporter struct { + Exporter + + input chan exportData + inputMu sync.Mutex + + done chan struct{} + stopped atomic.Bool +} + +// newBufferExporter returns a new bufferExporter that wraps exporter. The +// returned bufferExporter will buffer at most size number of export requests. +// If size is less than zero, zero will be used (i.e. only synchronous +// exporting will be supported). +func newBufferExporter(exporter Exporter, size int) *bufferExporter { + if size < 0 { + size = 0 + } + input := make(chan exportData, size) + return &bufferExporter{ + Exporter: exporter, + + input: input, + done: exportSync(input, exporter), + } +} + +var errStopped = errors.New("exporter stopped") + +func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error { + data := exportData{ctx, records, rCh} + + e.inputMu.Lock() + defer e.inputMu.Unlock() + + // Check stopped before enqueueing now that e.inputMu is held. This + // prevents sends on a closed chan when Shutdown is called concurrently. + if e.stopped.Load() { + return errStopped + } + + select { + case e.input <- data: + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +// EnqueueExport enqueues an export of records in the context of ctx to be +// performed asynchronously. This will return true if the exported is +// successfully enqueued, false otherwise. +func (e *bufferExporter) EnqueueExport(records []Record) bool { + if len(records) == 0 { + // Nothing to enqueue, do not waste input space. + return true + } + return e.enqueue(context.Background(), records, nil) == nil +} + +// Export synchronously exports records in the context of ctx. This will not +// return until the export has been completed. +func (e *bufferExporter) Export(ctx context.Context, records []Record) error { + if len(records) == 0 { + return nil + } + + resp := make(chan error, 1) + err := e.enqueue(ctx, records, resp) + if err != nil { + if errors.Is(err, errStopped) { + return nil + } + return fmt.Errorf("%w: dropping %d records", err, len(records)) + } + + select { + case err := <-resp: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// ForceFlush flushes buffered exports. Any existing exports that is buffered +// is flushed before this returns. +func (e *bufferExporter) ForceFlush(ctx context.Context) error { + resp := make(chan error, 1) + err := e.enqueue(ctx, nil, resp) + if err != nil { + if errors.Is(err, errStopped) { + return nil + } + return err + } + + select { + case <-resp: + case <-ctx.Done(): + return ctx.Err() + } + return e.Exporter.ForceFlush(ctx) +} + +// Shutdown shuts down e. +// +// Any buffered exports are flushed before this returns. +// +// All calls to EnqueueExport or Exporter will return nil without any export +// after this is called. +func (e *bufferExporter) Shutdown(ctx context.Context) error { + if e.stopped.Swap(true) { + return nil + } + e.inputMu.Lock() + defer e.inputMu.Unlock() + + // No more sends will be made. + close(e.input) + select { + case <-e.done: + case <-ctx.Done(): + return errors.Join(ctx.Err(), e.Exporter.Shutdown(ctx)) + } + return e.Exporter.Shutdown(ctx) +} diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 6ae635fb7fe..85c12860409 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -5,6 +5,8 @@ package log import ( "context" + "io" + stdlog "log" "slices" "sync" "sync/atomic" @@ -33,8 +35,10 @@ type testExporter struct { // Counts of method calls. exportN, shutdownN, forceFlushN *int32 - input chan instruction - done chan struct{} + stopped atomic.Bool + inputMu sync.Mutex + input chan instruction + done chan struct{} } func newTestExporter(err error) *testExporter { @@ -85,7 +89,11 @@ func (e *testExporter) Export(ctx context.Context, r []Record) error { return ctx.Err() } } - e.input <- instruction{Record: &r} + e.inputMu.Lock() + defer e.inputMu.Unlock() + if !e.stopped.Load() { + e.input <- instruction{Record: &r} + } return e.Err } @@ -94,6 +102,12 @@ func (e *testExporter) ExportN() int { } func (e *testExporter) Stop() { + if e.stopped.Swap(true) { + return + } + e.inputMu.Lock() + defer e.inputMu.Unlock() + close(e.input) <-e.done } @@ -192,6 +206,12 @@ func TestExportSync(t *testing.T) { var got error handler := otel.ErrorHandlerFunc(func(err error) { got = err }) otel.SetErrorHandler(handler) + t.Cleanup(func() { + l := stdlog.New(io.Discard, "", stdlog.LstdFlags) + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + l.Print(err) + })) + }) in := make(chan exportData, 1) exp := newTestExporter(assert.AnError) @@ -304,3 +324,266 @@ func TestTimeoutExporter(t *testing.T) { close(out) }) } + +func TestBufferExporter(t *testing.T) { + t.Run("ConcurrentSafe", func(t *testing.T) { + const goRoutines = 10 + + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, goRoutines) + + ctx := context.Background() + records := make([]Record, 10) + + stop := make(chan struct{}) + var wg sync.WaitGroup + for i := 0; i < goRoutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + _ = e.EnqueueExport(records) + _ = e.Export(ctx, records) + _ = e.ForceFlush(ctx) + } + } + }() + } + + assert.Eventually(t, func() bool { + return exp.ExportN() > 0 + }, 2*time.Second, time.Microsecond) + + assert.NoError(t, e.Shutdown(ctx)) + close(stop) + wg.Wait() + }) + + t.Run("Shutdown", func(t *testing.T) { + t.Run("Multiple", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 1) + + assert.NoError(t, e.Shutdown(context.Background())) + assert.Equal(t, 1, exp.ShutdownN(), "first Shutdown") + + assert.NoError(t, e.Shutdown(context.Background())) + assert.Equal(t, 1, exp.ShutdownN(), "second Shutdown") + }) + + t.Run("ContextCancelled", func(t *testing.T) { + exp := newTestExporter(assert.AnError) + t.Cleanup(exp.Stop) + + trigger := make(chan struct{}) + exp.ExportTrigger = trigger + t.Cleanup(func() { close(trigger) }) + e := newBufferExporter(exp, 1) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := e.Shutdown(ctx) + assert.ErrorIs(t, err, context.Canceled) + assert.ErrorIs(t, err, assert.AnError) + }) + + t.Run("Error", func(t *testing.T) { + exp := newTestExporter(assert.AnError) + t.Cleanup(exp.Stop) + + e := newBufferExporter(exp, 1) + assert.ErrorIs(t, e.Shutdown(context.Background()), assert.AnError) + }) + }) + + t.Run("ForceFlush", func(t *testing.T) { + t.Run("Multiple", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 2) + + ctx := context.Background() + records := make([]Record, 1) + require.NoError(t, e.enqueue(ctx, records, nil), "enqueue") + + assert.NoError(t, e.ForceFlush(ctx), "ForceFlush records") + assert.Equal(t, 1, exp.ExportN(), "Export number incremented") + assert.Len(t, exp.Records(), 1, "exported Record batches") + + // Nothing to flush. + assert.NoError(t, e.ForceFlush(ctx), "ForceFlush empty") + assert.Equal(t, 1, exp.ExportN(), "Export number changed") + assert.Len(t, exp.Records(), 0, "exported non-zero Records") + }) + + t.Run("ContextCancelled", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + + trigger := make(chan struct{}) + exp.ExportTrigger = trigger + t.Cleanup(func() { close(trigger) }) + e := newBufferExporter(exp, 1) + + ctx, cancel := context.WithCancel(context.Background()) + require.True(t, e.EnqueueExport(make([]Record, 1))) + + got := make(chan error, 1) + go func() { got <- e.ForceFlush(ctx) }() + require.Eventually(t, func() bool { + return exp.ExportN() > 0 + }, 2*time.Second, time.Microsecond) + cancel() // Canceled before export response. + err := <-got + assert.ErrorIs(t, err, context.Canceled, "enqueued") + _ = e.Shutdown(ctx) + + // Zero length buffer + e = newBufferExporter(exp, 0) + assert.ErrorIs(t, e.ForceFlush(ctx), context.Canceled, "not enqueued") + }) + + t.Run("Error", func(t *testing.T) { + exp := newTestExporter(assert.AnError) + t.Cleanup(exp.Stop) + + e := newBufferExporter(exp, 1) + assert.ErrorIs(t, e.ForceFlush(context.Background()), assert.AnError) + }) + + t.Run("Stopped", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + + e := newBufferExporter(exp, 1) + + ctx := context.Background() + _ = e.Shutdown(ctx) + assert.NoError(t, e.ForceFlush(ctx)) + }) + }) + + t.Run("Export", func(t *testing.T) { + t.Run("ZeroRecords", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 1) + + assert.NoError(t, e.Export(context.Background(), nil)) + assert.Equal(t, 0, exp.ExportN()) + }) + + t.Run("Multiple", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 1) + + ctx := context.Background() + records := make([]Record, 1) + records[0].SetBody(log.BoolValue(true)) + + assert.NoError(t, e.Export(ctx, records)) + + n := exp.ExportN() + assert.Equal(t, 1, n, "first Export number") + assert.Equal(t, [][]Record{records}, exp.Records()) + + assert.NoError(t, e.Export(ctx, records)) + assert.Equal(t, n+1, exp.ExportN(), "second Export number") + assert.Equal(t, [][]Record{records}, exp.Records()) + }) + + t.Run("ContextCancelled", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + + trigger := make(chan struct{}) + exp.ExportTrigger = trigger + t.Cleanup(func() { close(trigger) }) + e := newBufferExporter(exp, 1) + + records := make([]Record, 1) + ctx, cancel := context.WithCancel(context.Background()) + + got := make(chan error, 1) + go func() { got <- e.Export(ctx, records) }() + require.Eventually(t, func() bool { + return exp.ExportN() > 0 + }, 2*time.Second, time.Microsecond) + cancel() // Canceled before export response. + err := <-got + assert.ErrorIs(t, err, context.Canceled, "enqueued") + _ = e.Shutdown(ctx) + + // Zero length buffer + e = newBufferExporter(exp, 0) + assert.ErrorIs(t, e.Export(ctx, records), context.Canceled, "not enqueued") + }) + + t.Run("Error", func(t *testing.T) { + exp := newTestExporter(assert.AnError) + t.Cleanup(exp.Stop) + + e := newBufferExporter(exp, 1) + ctx, records := context.Background(), make([]Record, 1) + assert.ErrorIs(t, e.Export(ctx, records), assert.AnError) + }) + + t.Run("Stopped", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + + e := newBufferExporter(exp, 1) + + ctx := context.Background() + _ = e.Shutdown(ctx) + assert.NoError(t, e.Export(ctx, make([]Record, 1))) + assert.Equal(t, 0, exp.ExportN(), "Export called") + }) + }) + + t.Run("EnqueueExport", func(t *testing.T) { + t.Run("ZeroRecords", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 1) + + assert.True(t, e.EnqueueExport(nil)) + e.ForceFlush(context.Background()) + assert.Equal(t, 0, exp.ExportN(), "empty batch enqueued") + }) + + t.Run("Multiple", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 2) + + records := make([]Record, 1) + records[0].SetBody(log.BoolValue(true)) + + assert.True(t, e.EnqueueExport(records)) + assert.True(t, e.EnqueueExport(records)) + e.ForceFlush(context.Background()) + + n := exp.ExportN() + assert.Equal(t, 2, n, "Export number") + assert.Equal(t, [][]Record{records, records}, exp.Records()) + }) + + t.Run("Stopped", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 1) + + _ = e.Shutdown(context.Background()) + assert.False(t, e.EnqueueExport(make([]Record, 1))) + }) + }) +} From 6c6e1e7416e996690a5d2dbd117ed9c8bb1a45cd Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 3 Apr 2024 04:53:16 -0700 Subject: [PATCH 11/11] Add queue for BatchingProcessor (#5131) --- sdk/log/batch.go | 87 ++++++++++++++++++++++++++++++++ sdk/log/batch_test.go | 113 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 200 insertions(+) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index a17f92f5ed8..bb85a2a34fb 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -4,7 +4,9 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( + "container/ring" "context" + "sync" "time" ) @@ -76,6 +78,91 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error { return nil } +// queue holds a queue of logging records. +// +// When the queue becomes full, the oldest records in the queue are +// overwritten. +type queue struct { + sync.Mutex + + cap, len int + read, write *ring.Ring +} + +func newQueue(size int) *queue { + r := ring.New(size) + return &queue{ + cap: size, + read: r, + write: r, + } +} + +// Enqueue adds r to the queue. The queue size, including the addition of r, is +// returned. +// +// If enqueueing r will exceed the capacity of q, the oldest Record held in q +// will be dropped and r retained. +func (q *queue) Enqueue(r Record) int { + q.Lock() + defer q.Unlock() + + q.write.Value = r + q.write = q.write.Next() + + q.len++ + if q.len > q.cap { + // Overflow. Advance read to be the new "oldest". + q.len = q.cap + q.read = q.read.Next() + } + return q.len +} + +// TryDequeue attempts to dequeue up to len(buf) Records. The available Records +// will be assigned into buf and passed to write. If write fails, returning +// false, the Records will not be removed from the queue. If write succeeds, +// returning true, the dequeued Records are removed from the queue. The number +// of Records remaining in the queue are returned. +// +// When write is called the lock of q is held. The write function must not call +// other methods of this q that acquire the lock. +func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { + q.Lock() + defer q.Unlock() + + origRead := q.read + + n := min(len(buf), q.len) + for i := 0; i < n; i++ { + buf[i] = q.read.Value.(Record) + q.read = q.read.Next() + } + + if write(buf[:n]) { + q.len -= n + } else { + q.read = origRead + } + return q.len +} + +// Flush returns all the Records held in the queue and resets it to be +// empty. +func (q *queue) Flush() []Record { + q.Lock() + defer q.Unlock() + + out := make([]Record, q.len) + for i := range out { + out[i] = q.read.Value.(Record) + q.read = q.read.Next() + } + q.len = 0 + + return out +} + type batchingConfig struct { maxQSize setting[int] expInterval setting[time.Duration] diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 111e2dea374..adbdb1d8bce 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -4,13 +4,17 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( + "slices" "strconv" + "sync" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/log" ) func TestNewBatchingConfig(t *testing.T) { @@ -126,3 +130,112 @@ func TestNewBatchingConfig(t *testing.T) { }) } } + +func TestQueue(t *testing.T) { + var r Record + r.SetBody(log.BoolValue(true)) + + t.Run("newQueue", func(t *testing.T) { + const size = 1 + q := newQueue(size) + assert.Equal(t, q.len, 0) + assert.Equal(t, size, q.cap, "capacity") + assert.Equal(t, size, q.read.Len(), "read ring") + assert.Same(t, q.read, q.write, "different rings") + }) + + t.Run("Enqueue", func(t *testing.T) { + const size = 2 + q := newQueue(size) + + var notR Record + notR.SetBody(log.IntValue(10)) + + assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch") + assert.Equal(t, 1, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, 2, q.Enqueue(r), "complete batch") + assert.Equal(t, 2, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, 2, q.Enqueue(r), "overflow batch") + assert.Equal(t, 2, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records") + }) + + t.Run("Flush", func(t *testing.T) { + const size = 2 + q := newQueue(size) + q.write.Value = r + q.write = q.write.Next() + q.len = 1 + + assert.Equal(t, []Record{r}, q.Flush(), "flushed") + }) + + t.Run("TryFlush", func(t *testing.T) { + const size = 3 + q := newQueue(size) + for i := 0; i < size-1; i++ { + q.write.Value = r + q.write = q.write.Next() + q.len++ + } + + buf := make([]Record, 1) + f := func([]Record) bool { return false } + assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed") + require.Equal(t, size-1, q.len, "length") + require.NotSame(t, q.read, q.write, "read ring advanced") + + var flushed []Record + f = func(r []Record) bool { + flushed = append(flushed, r...) + return true + } + if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") { + assert.Equal(t, []Record{r}, flushed, "Records") + } + + buf = slices.Grow(buf, size) + flushed = flushed[:0] + if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") { + assert.Equal(t, []Record{r}, flushed, "Records") + } + }) + + t.Run("ConcurrentSafe", func(t *testing.T) { + const goRoutines = 10 + + flushed := make(chan []Record, goRoutines) + out := make([]Record, 0, goRoutines) + done := make(chan struct{}) + go func() { + defer close(done) + for recs := range flushed { + out = append(out, recs...) + } + }() + + var wg sync.WaitGroup + wg.Add(goRoutines) + + b := newQueue(goRoutines) + for i := 0; i < goRoutines; i++ { + go func() { + defer wg.Done() + b.Enqueue(Record{}) + flushed <- b.Flush() + }() + } + + wg.Wait() + close(flushed) + <-done + + assert.Len(t, out, goRoutines, "flushed Records") + }) +}