Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OTLP testing flake: signal connection from mock collector #1816

Merged
merged 6 commits into from
Apr 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
Additionally, this tag is overridden, as specified in the OTel specification, if the event contains an attribute with that key. (#1768)
- Zipkin Exporter: Ensure mapping between OTel and Zipkin span data complies with the specification. (#1688)
- Fixed typo for default service name in Jaeger Exporter. (#1797)
- Fix flaky OTLP for the reconnnection of the client connection. (#1527, TBD)

### Changed

Expand Down
75 changes: 65 additions & 10 deletions exporters/otlp/otlpgrpc/mock_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"net"
"runtime"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -107,7 +109,8 @@ type mockCollector struct {
metricSvc *mockMetricService

endpoint string
stopFunc func() error
ln *listener
stopFunc func()
stopOnce sync.Once
}

Expand All @@ -119,8 +122,9 @@ var errAlreadyStopped = fmt.Errorf("already stopped")
func (mc *mockCollector) stop() error {
var err = errAlreadyStopped
mc.stopOnce.Do(func() {
err = nil
if mc.stopFunc != nil {
err = mc.stopFunc()
mc.stopFunc()
}
})
// Give it sometime to shutdown.
Expand Down Expand Up @@ -189,19 +193,70 @@ func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector {
mc := makeMockCollector(t)
collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc)
collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc)
mc.ln = newListener(ln)
go func() {
_ = srv.Serve(ln)
_ = srv.Serve((net.Listener)(mc.ln))
}()

deferFunc := func() error {
srv.Stop()
return ln.Close()
mc.endpoint = ln.Addr().String()
// srv.Stop calls Close on mc.ln.
mc.stopFunc = srv.Stop

return mc
}

type listener struct {
closeOnce sync.Once
wrapped net.Listener
C chan struct{}
}

func newListener(wrapped net.Listener) *listener {
return &listener{
wrapped: wrapped,
C: make(chan struct{}, 1),
}
}

_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
func (l *listener) Close() error { return l.wrapped.Close() }

mc.endpoint = "localhost:" + collectorPortStr
mc.stopFunc = deferFunc
func (l *listener) Addr() net.Addr { return l.wrapped.Addr() }

return mc
// Accept waits for and returns the next connection to the listener. It will
// send a signal on l.C that a connection has been made before returning.
func (l *listener) Accept() (net.Conn, error) {
conn, err := l.wrapped.Accept()
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// Go 1.16 exported net.ErrClosed that could clean up this check, but to
// remain backwards compatible with previous versions of Go that we
// support the following string evaluation is used instead to keep in line
// with the previously recommended way to check this:
// https://github.com/golang/go/issues/4373#issuecomment-353076799
if strings.Contains(err.Error(), "use of closed network connection") {
// If the listener has been closed, do not allow callers of
// WaitForConn to wait for a connection that will never come.
l.closeOnce.Do(func() { close(l.C) })
}
return conn, err
}

select {
case l.C <- struct{}{}:
default:
// If C is full, assume nobody is listening and move on.
}
return conn, nil
}

// WaitForConn will wait indefintely for a connection to be estabilished with
// the listener before returning.
func (l *listener) WaitForConn() {
for {
select {
case <-l.C:
return
default:
runtime.Gosched()
}
}
}
84 changes: 20 additions & 64 deletions exporters/otlp/otlpgrpc/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"net"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -149,44 +148,22 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()

// Wait for a connection.
mc.ln.WaitForConn()

// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()
require.NoError(t, mc.stop())

// first export, it will send disconnected message to the channel on export failure,
// trigger almost immediate reconnection
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)

// Give the exporter sometime to reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// second export, it will detect connection issue, change state of exporter to disconnected and
// send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel)
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused2",
mc.endpoint,
)
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// as a result we have exporter in disconnected state waiting for disconnection message to reconnect

Expand All @@ -195,17 +172,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test

// make sure reconnection loop hits beginning and goes back to waiting mode
// after hitting beginning of the loop it should reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
nmc.ln.WaitForConn()

n := 10
for i := 0; i < n; i++ {
Expand All @@ -226,52 +193,39 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Disconnected collector: spans: got %d want %d", g, w)
}

require.NoError(t, nmc.Stop())
}

func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)

reconnectionPeriod := 20 * time.Millisecond
reconnectionPeriod := 50 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()

mc.ln.WaitForConn()

// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()
require.NoError(t, mc.stop())

// In the test below, we'll stop the collector many times,
// while exporting traces and test to ensure that we can
// reconnect.
for j := 0; j < 3; j++ {

// No endpoint up.
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// Now resurrect the collector by making a new one but reusing the
// old endpoint, and the collector should reconnect automatically.
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)

// Give the exporter sometime to reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
nmc.ln.WaitForConn()

n := 10
for i := 0; i < n; i++ {
Expand All @@ -289,7 +243,9 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w)
}
_ = nmc.stop()

// Disconnect for the next try.
require.NoError(t, nmc.stop())
}
}

Expand Down