Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OTLP testing flake: signal connection from mock collector #1816

Merged
merged 6 commits into from
Apr 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
Additionally, this tag is overridden, as specified in the OTel specification, if the event contains an attribute with that key. (#1768)
- Zipkin Exporter: Ensure mapping between OTel and Zipkin span data complies with the specification. (#1688)
- Fixed typo for default service name in Jaeger Exporter. (#1797)
- Fix flaky OTLP for the reconnnection of the client connection. (#1527, TBD)

### Changed

Expand Down
79 changes: 69 additions & 10 deletions exporters/otlp/otlpgrpc/mock_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -107,7 +108,8 @@ type mockCollector struct {
metricSvc *mockMetricService

endpoint string
stopFunc func() error
ln *listener
stopFunc func()
stopOnce sync.Once
}

Expand All @@ -119,8 +121,9 @@ var errAlreadyStopped = fmt.Errorf("already stopped")
func (mc *mockCollector) stop() error {
var err = errAlreadyStopped
mc.stopOnce.Do(func() {
err = nil
if mc.stopFunc != nil {
err = mc.stopFunc()
mc.stopFunc()
}
})
// Give it sometime to shutdown.
Expand Down Expand Up @@ -174,6 +177,19 @@ func (mc *mockCollector) GetMetrics() []*metricpb.Metric {
return mc.getMetrics()
}

// WaitForConn will wait indefintely for a connection to be estabilished
// with the mockCollector before returning.
func (mc *mockCollector) WaitForConn() {
for {
select {
case <-mc.ln.C:
return
default:
runtime.Gosched()
}
}
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}

// runMockCollector is a helper function to create a mock Collector
func runMockCollector(t *testing.T) *mockCollector {
return runMockCollectorAtEndpoint(t, "localhost:0")
Expand All @@ -189,19 +205,62 @@ func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector {
mc := makeMockCollector(t)
collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc)
collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc)
mc.ln = newListener(ln)
go func() {
_ = srv.Serve(ln)
_ = srv.Serve((net.Listener)(mc.ln))
}()

deferFunc := func() error {
srv.Stop()
return ln.Close()
mc.endpoint = ln.Addr().String()
// srv.Stop calls Close on mc.ln.
mc.stopFunc = srv.Stop

return mc
}

type listener struct {
wrapped net.Listener

C chan struct{}

closed chan struct{}
closeOnce sync.Once
}

func newListener(wrapped net.Listener) *listener {
return &listener{
wrapped: wrapped,
C: make(chan struct{}, 1),
closed: make(chan struct{}),
}
}

_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
func (l *listener) Accept() (net.Conn, error) {
conn, err := l.wrapped.Accept()
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

// If the listener has been closed do not allow callers of WaitForConn to
// wait for a connection that will never come. This is not in the select
// statement below because of their non-deterministic execution order of
// select.
select {
case <-l.closed:
// Close l.C here so we do not have a race between the Close function
// and the write below.
l.closeOnce.Do(func() { close(l.C) })
return conn, err
default:
}

mc.endpoint = "localhost:" + collectorPortStr
mc.stopFunc = deferFunc
select {
case l.C <- struct{}{}:
default:
// If C is full, assume nobody is listening and move on.
}
return conn, err
}

return mc
func (l *listener) Close() error {
close(l.closed)
return l.wrapped.Close()
}

func (l *listener) Addr() net.Addr { return l.wrapped.Addr() }
84 changes: 20 additions & 64 deletions exporters/otlp/otlpgrpc/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"net"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -149,44 +148,22 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()

// Wait for a connection.
mc.WaitForConn()

// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()
require.NoError(t, mc.stop())

// first export, it will send disconnected message to the channel on export failure,
// trigger almost immediate reconnection
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)

// Give the exporter sometime to reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// second export, it will detect connection issue, change state of exporter to disconnected and
// send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel)
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused2",
mc.endpoint,
)
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// as a result we have exporter in disconnected state waiting for disconnection message to reconnect

Expand All @@ -195,17 +172,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test

// make sure reconnection loop hits beginning and goes back to waiting mode
// after hitting beginning of the loop it should reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
nmc.WaitForConn()

n := 10
for i := 0; i < n; i++ {
Expand All @@ -226,52 +193,39 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Disconnected collector: spans: got %d want %d", g, w)
}

require.NoError(t, nmc.Stop())
}

func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)

reconnectionPeriod := 20 * time.Millisecond
reconnectionPeriod := 50 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()

mc.WaitForConn()

// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()
require.NoError(t, mc.stop())

// In the test below, we'll stop the collector many times,
// while exporting traces and test to ensure that we can
// reconnect.
for j := 0; j < 3; j++ {

// No endpoint up.
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// Now resurrect the collector by making a new one but reusing the
// old endpoint, and the collector should reconnect automatically.
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)

// Give the exporter sometime to reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
nmc.WaitForConn()

n := 10
for i := 0; i < n; i++ {
Expand All @@ -289,7 +243,9 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w)
}
_ = nmc.stop()

// Disconnect for the next try.
require.NoError(t, nmc.stop())
}
}

Expand Down