Skip to content

Commit

Permalink
Merge branch 'main' into otel-collector-example
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM authored Apr 25, 2024
2 parents 48a9e79 + 19ee6d4 commit 813cc77
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 69 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add `RecordFactory` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the bridge implementations. (#5263)
- Add `RecordFactory` in `go.opentelemetry.io/otel/sdk/log/logtest` to facilitate testing the exporter and processor implementations. (#5258)
- Add example for `go.opentelemetry.io/otel/exporters/stdout/stdoutlog`. (#5242)

### Changed

- `go.opentelemetry.io/otel/exporters/stdout/stdoutlog` won't print timestamps when `WithoutTimestamps` option is set. (#5241)
- The `Shutdown` method of `Exporter` in `go.opentelemetry.io/otel/exporters/stdout/stdouttrace` ignores the context cancellation and always returns `nil`. (#5189)
- The `ForceFlush` and `Shutdown` methods of the exporter returned by `New` in `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric` ignore the context cancellation and always return `nil`. (#5189)

## [1.26.0/0.48.0/0.2.0-alpha] 2024-04-24

Expand Down
32 changes: 32 additions & 0 deletions exporters/stdout/stdoutlog/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package stdoutlog_test

import (
"context"

"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/sdk/log"
)

func Example() {
exp, err := stdoutlog.New()
if err != nil {
panic(err)
}

processor := log.NewSimpleProcessor(exp)
provider := log.NewLoggerProvider(log.WithProcessor(processor))
defer func() {
if err := provider.Shutdown(context.Background()); err != nil {
panic(err)
}
}()

global.SetLoggerProvider(provider)

// From here, the provider can be used by instrumentation to collect
// telemetry.
}
36 changes: 21 additions & 15 deletions exporters/stdout/stdoutlog/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestExporter(t *testing.T) {

return exporter
}(),
want: getJSON(now),
want: getJSON(&now),
},
}

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestExporterExport(t *testing.T) {
options: []Option{},
ctx: context.Background(),
records: records,
wantResult: getJSONs(now),
wantResult: getJSONs(&now),
},
{
name: "NoRecords",
Expand All @@ -127,21 +127,21 @@ func TestExporterExport(t *testing.T) {
options: []Option{WithPrettyPrint()},
ctx: context.Background(),
records: records,
wantResult: getPrettyJSONs(now),
wantResult: getPrettyJSONs(&now),
},
{
name: "WithoutTimestamps",
options: []Option{WithoutTimestamps()},
ctx: context.Background(),
records: records,
wantResult: getJSONs(time.Time{}),
wantResult: getJSONs(nil),
},
{
name: "WithoutTimestamps and WithPrettyPrint",
options: []Option{WithoutTimestamps(), WithPrettyPrint()},
ctx: context.Background(),
records: records,
wantResult: getPrettyJSONs(time.Time{}),
wantResult: getPrettyJSONs(nil),
},
{
name: "WithCanceledContext",
Expand Down Expand Up @@ -171,22 +171,28 @@ func TestExporterExport(t *testing.T) {
}
}

func getJSON(now time.Time) string {
serializedNow, _ := json.Marshal(now)
func getJSON(now *time.Time) string {
var timestamps string
if now != nil {
serializedNow, _ := json.Marshal(now)
timestamps = "\"Timestamp\":" + string(serializedNow) + ",\"ObservedTimestamp\":" + string(serializedNow) + ","
}

return "{\"Timestamp\":" + string(serializedNow) + ",\"ObservedTimestamp\":" + string(serializedNow) + ",\"Severity\":9,\"SeverityText\":\"INFO\",\"Body\":{},\"Attributes\":[{\"Key\":\"key\",\"Value\":{}},{\"Key\":\"key2\",\"Value\":{}},{\"Key\":\"key3\",\"Value\":{}},{\"Key\":\"key4\",\"Value\":{}},{\"Key\":\"key5\",\"Value\":{}},{\"Key\":\"bool\",\"Value\":{}}],\"TraceID\":\"0102030405060708090a0b0c0d0e0f10\",\"SpanID\":\"0102030405060708\",\"TraceFlags\":\"01\",\"Resource\":null,\"Scope\":{\"Name\":\"\",\"Version\":\"\",\"SchemaURL\":\"\"},\"AttributeValueLengthLimit\":0,\"AttributeCountLimit\":0}\n"
return "{" + timestamps + "\"Severity\":9,\"SeverityText\":\"INFO\",\"Body\":{},\"Attributes\":[{\"Key\":\"key\",\"Value\":{}},{\"Key\":\"key2\",\"Value\":{}},{\"Key\":\"key3\",\"Value\":{}},{\"Key\":\"key4\",\"Value\":{}},{\"Key\":\"key5\",\"Value\":{}},{\"Key\":\"bool\",\"Value\":{}}],\"TraceID\":\"0102030405060708090a0b0c0d0e0f10\",\"SpanID\":\"0102030405060708\",\"TraceFlags\":\"01\",\"Resource\":null,\"Scope\":{\"Name\":\"\",\"Version\":\"\",\"SchemaURL\":\"\"},\"AttributeValueLengthLimit\":0,\"AttributeCountLimit\":0}\n"
}

func getJSONs(now time.Time) string {
func getJSONs(now *time.Time) string {
return getJSON(now) + getJSON(now)
}

func getPrettyJSON(now time.Time) string {
serializedNow, _ := json.Marshal(now)
func getPrettyJSON(now *time.Time) string {
var timestamps string
if now != nil {
serializedNow, _ := json.Marshal(now)
timestamps = "\n\t\"Timestamp\": " + string(serializedNow) + ",\n\t\"ObservedTimestamp\": " + string(serializedNow) + ","
}

return `{
"Timestamp": ` + string(serializedNow) + `,
"ObservedTimestamp": ` + string(serializedNow) + `,
return `{` + timestamps + `
"Severity": 9,
"SeverityText": "INFO",
"Body": {},
Expand Down Expand Up @@ -231,7 +237,7 @@ func getPrettyJSON(now time.Time) string {
`
}

func getPrettyJSONs(now time.Time) string {
func getPrettyJSONs(now *time.Time) string {
return getPrettyJSON(now) + getPrettyJSON(now)
}

Expand Down
11 changes: 7 additions & 4 deletions exporters/stdout/stdoutlog/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

// recordJSON is a JSON-serializable representation of a Record.
type recordJSON struct {
Timestamp time.Time
ObservedTimestamp time.Time
Timestamp *time.Time `json:",omitempty"`
ObservedTimestamp *time.Time `json:",omitempty"`
Severity log.Severity
SeverityText string
Body log.Value
Expand Down Expand Up @@ -53,8 +53,11 @@ func (e *Exporter) newRecordJSON(r sdklog.Record) recordJSON {
})

if e.timestamps {
newRecord.Timestamp = r.Timestamp()
newRecord.ObservedTimestamp = r.ObservedTimestamp()
timestamp := r.Timestamp()
newRecord.Timestamp = &timestamp

observedTimestamp := r.ObservedTimestamp()
newRecord.ObservedTimestamp = &observedTimestamp
}

return newRecord
Expand Down
16 changes: 6 additions & 10 deletions exporters/stdout/stdoutmetric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ func (e *exporter) Aggregation(k metric.InstrumentKind) metric.Aggregation {
}

func (e *exporter) Export(ctx context.Context, data *metricdata.ResourceMetrics) error {
select {
case <-ctx.Done():
// Don't do anything if the context has already timed out.
return ctx.Err()
default:
// Context is still valid, continue.
if err := ctx.Err(); err != nil {
return err
}
if e.redactTimestamps {
redactTimestamps(data)
Expand All @@ -67,18 +63,18 @@ func (e *exporter) Export(ctx context.Context, data *metricdata.ResourceMetrics)
return e.encVal.Load().(encoderHolder).Encode(data)
}

func (e *exporter) ForceFlush(ctx context.Context) error {
func (e *exporter) ForceFlush(context.Context) error {
// exporter holds no state, nothing to flush.
return ctx.Err()
return nil
}

func (e *exporter) Shutdown(ctx context.Context) error {
func (e *exporter) Shutdown(context.Context) error {
e.shutdownOnce.Do(func() {
e.encVal.Store(encoderHolder{
encoder: shutdownEncoder{},
})
})
return ctx.Err()
return nil
}

func (e *exporter) MarshalLog() interface{} {
Expand Down
45 changes: 34 additions & 11 deletions exporters/stdout/stdoutmetric/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func testEncoderOption() stdoutmetric.Option {
func testCtxErrHonored(factory func(*testing.T) func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ctx := context.Background()

t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
Expand All @@ -55,26 +54,50 @@ func testCtxErrHonored(factory func(*testing.T) func(context.Context) error) fun
}
}

func TestExporterHonorsContextErrors(t *testing.T) {
t.Run("Shutdown", testCtxErrHonored(func(t *testing.T) func(context.Context) error {
func testCtxErrIgnored(factory func(*testing.T) func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx := context.Background()

t.Run("Canceled Ignored", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()

f := factory(t)
assert.NoError(t, f(innerCtx))
})

t.Run("NoError", func(t *testing.T) {
f := factory(t)
assert.NoError(t, f(ctx))
})
}
}

func TestExporterExportHonorsContextErrors(t *testing.T) {
t.Run("Export", testCtxErrHonored(func(t *testing.T) func(context.Context) error {
exp, err := stdoutmetric.New(testEncoderOption())
require.NoError(t, err)
return exp.Shutdown
return func(ctx context.Context) error {
data := new(metricdata.ResourceMetrics)
return exp.Export(ctx, data)
}
}))
}

t.Run("ForceFlush", testCtxErrHonored(func(t *testing.T) func(context.Context) error {
func TestExporterForceFlushIgnoresContextErrors(t *testing.T) {
t.Run("ForceFlush", testCtxErrIgnored(func(t *testing.T) func(context.Context) error {
exp, err := stdoutmetric.New(testEncoderOption())
require.NoError(t, err)
return exp.ForceFlush
}))
}

t.Run("Export", testCtxErrHonored(func(t *testing.T) func(context.Context) error {
func TestExporterShutdownIgnoresContextErrors(t *testing.T) {
t.Run("Shutdown", testCtxErrIgnored(func(t *testing.T) func(context.Context) error {
exp, err := stdoutmetric.New(testEncoderOption())
require.NoError(t, err)
return func(ctx context.Context) error {
data := new(metricdata.ResourceMetrics)
return exp.Export(ctx, data)
}
return exp.Shutdown
}))
}

Expand Down
8 changes: 3 additions & 5 deletions exporters/stdout/stdouttrace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Exporter struct {

// ExportSpans writes spans in json format to stdout.
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
if err := ctx.Err(); err != nil {
return err
}
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
Expand Down Expand Up @@ -88,11 +91,6 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
e.stopped = true
e.stoppedMu.Unlock()

select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}

Expand Down
46 changes: 22 additions & 24 deletions exporters/stdout/stdouttrace/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,44 @@ func TestExporterExportSpan(t *testing.T) {
tests := []struct {
opts []stdouttrace.Option
expectNow time.Time
ctx context.Context
wantErr error
}{
{
opts: []stdouttrace.Option{stdouttrace.WithPrettyPrint()},
expectNow: now,
ctx: context.Background(),
},
{
opts: []stdouttrace.Option{stdouttrace.WithPrettyPrint(), stdouttrace.WithoutTimestamps()},
// expectNow is an empty time.Time
ctx: context.Background(),
},
{
opts: []stdouttrace.Option{},
ctx: func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}(),
wantErr: context.Canceled,
},
}

ctx := context.Background()
for _, tt := range tests {
// write to buffer for testing
var b bytes.Buffer
ex, err := stdouttrace.New(append(tt.opts, stdouttrace.WithWriter(&b))...)
require.Nil(t, err)

err = ex.ExportSpans(ctx, tracetest.SpanStubs{ss, ss}.Snapshots())
require.Nil(t, err)
err = ex.ExportSpans(tt.ctx, tracetest.SpanStubs{ss, ss}.Snapshots())
assert.Equal(t, tt.wantErr, err)

got := b.String()
wantone := expectedJSON(tt.expectNow)
assert.Equal(t, wantone+wantone, got)
if tt.wantErr == nil {
got := b.String()
wantone := expectedJSON(tt.expectNow)
assert.Equal(t, wantone+wantone, got)
}
}
}

Expand Down Expand Up @@ -181,23 +195,7 @@ func expectedJSON(now time.Time) string {
`
}

func TestExporterShutdownHonorsTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

e, err := stdouttrace.New()
if err != nil {
t.Fatalf("failed to create exporter: %v", err)
}

innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
defer innerCancel()
<-innerCtx.Done()
err = e.Shutdown(innerCtx)
assert.ErrorIs(t, err, context.DeadlineExceeded)
}

func TestExporterShutdownHonorsCancel(t *testing.T) {
func TestExporterShutdownIgnoresContext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

Expand All @@ -209,7 +207,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()
err = e.Shutdown(innerCtx)
assert.ErrorIs(t, err, context.Canceled)
assert.NoError(t, err)
}

func TestExporterShutdownNoError(t *testing.T) {
Expand Down
Loading

0 comments on commit 813cc77

Please sign in to comment.