Skip to content

Commit

Permalink
add NewExportPipeline and InstallNewPipeline for otlp
Browse files Browse the repository at this point in the history
Signed-off-by: binjip978 <binjip978@gmail.com>
  • Loading branch information
binjip978 committed Nov 28, 2020
1 parent 55ff277 commit a6c68e7
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Changed

- Add NewExportPipeline and InstallNewPipeline convince functions (#1347)
- Move the OpenCensus example into `example` directory. (#1359)
- `NewExporter` and `Start` functions in `go.opentelemetry.io/otel/exporters/otlp` now receive `context.Context` as a first parameter. (#1357)

Expand Down
56 changes: 56 additions & 0 deletions exporters/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@ import (
"context"
"errors"
"sync"
"time"

"google.golang.org/grpc"

"go.opentelemetry.io/otel"
colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// Exporter is an OpenTelemetry exporter. It exports both traces and metrics
Expand Down Expand Up @@ -231,3 +239,51 @@ func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) e
}
return err
}

func NewExportPipeline(ctx context.Context, exportOpts []ExporterOption,
resOpts []resource.Option) (*Exporter, *sdktrace.TracerProvider, *push.Controller, error) {

exp, err := NewExporter(ctx, exportOpts...)
if err != nil {
return nil, nil, nil, err
}

res, err := resource.New(ctx, resOpts...)
if err != nil {
return nil, nil, nil, err
}

bsp := sdktrace.NewBatchSpanProcessor(exp)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)

pusher := push.New(
basic.New(
simple.NewWithExactDistribution(),
exp,
),
exp,
push.WithPeriod(7*time.Second),
)

return exp, tracerProvider, pusher, nil
}

func InstallNewPipeline(ctx context.Context, exportOpts []ExporterOption,
resOpts []resource.Option) (*Exporter, *sdktrace.TracerProvider, *push.Controller, error) {

exp, tp, pusher, err := NewExportPipeline(ctx, exportOpts, resOpts)
if err != nil {
return nil, nil, nil, err
}

otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTracerProvider(tp)
otel.SetMeterProvider(pusher.MeterProvider())
pusher.Start()

return exp, tp, pusher, err
}
44 changes: 44 additions & 0 deletions exporters/otlp/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

func TestExporterShutdownHonorsTimeout(t *testing.T) {
Expand Down Expand Up @@ -113,3 +119,41 @@ func TestExporterShutdownManyTimes(t *testing.T) {
}
}
}

func TestInstallNewPipeline(t *testing.T) {
ctx := context.Background()
_, _, _, err := InstallNewPipeline(ctx, nil, nil)
assert.NoError(t, err)
assert.IsType(t, &sdktrace.TracerProvider{}, otel.GetTracerProvider())
}

func TestNewExportPipeline(t *testing.T) {
testCases := []struct {
name string
expOpts []ExporterOption
resOpts []resource.Option
testSpanSampling bool
}{
{
name: "simple pipeline",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, tp, _, err := NewExportPipeline(
context.Background(),
tc.expOpts,
tc.resOpts,
)

assert.NoError(t, err)
assert.NotEqual(t, tp, otel.GetTracerProvider())

_, span := tp.Tracer("otlp test").Start(context.Background(), tc.name)
spanCtx := span.SpanContext()
assert.Equal(t, true, spanCtx.IsSampled())
span.End()
})
}
}

0 comments on commit a6c68e7

Please sign in to comment.