Skip to content

Commit

Permalink
Merge pull request #271 from maxekman/tracing
Browse files Browse the repository at this point in the history
Tracing with OpenTracing
  • Loading branch information
maxekman authored Nov 26, 2020
2 parents 1bb2b0c + 2e01cf0 commit 82599db
Show file tree
Hide file tree
Showing 40 changed files with 1,271 additions and 281 deletions.
10 changes: 5 additions & 5 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("missed error in GCP event bus: %s", err)
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
// Retry the receive loop if there was an error.
time.Sleep(time.Second)
Expand All @@ -230,7 +230,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("missed error in GCP event bus: %s", err)
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
msg.Nack()
return
Expand All @@ -244,7 +244,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("missed error in GCP event bus: %s", err)
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
msg.Nack()
return
Expand All @@ -254,7 +254,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("missed error in GCP event bus: %s", err)
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
msg.Nack()
return
Expand All @@ -277,7 +277,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("missed error in GCP event bus: %s", err)
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
msg.Nack()
return
Expand Down
67 changes: 67 additions & 0 deletions eventbus/tracing/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2020 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tracing

import (
"context"
"encoding/json"
"log"

eh "github.com/looplab/eventhorizon"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

// The string keys to marshal the context.
const (
tracingSpanKeyStr = "eh_tracing_span"
)

func init() {
eh.RegisterContextMarshaler(func(ctx context.Context, vals map[string]interface{}) {
if span := opentracing.SpanFromContext(ctx); span != nil {
tracer := opentracing.GlobalTracer()
carrier := opentracing.TextMapCarrier{}
if err := tracer.Inject(span.Context(), opentracing.TextMap, &carrier); err != nil {
log.Printf("eventhorizon: could not inject tracing span: %s", err)
return
}
js, err := json.Marshal(carrier)
if err != nil {
log.Printf("eventhorizon: could not marshal tracing span: %s", err)
return
}
vals[tracingSpanKeyStr] = string(js)
}
})
eh.RegisterContextUnmarshaler(func(ctx context.Context, vals map[string]interface{}) context.Context {
if js, ok := vals[tracingSpanKeyStr].(string); ok {
tracer := opentracing.GlobalTracer()
carrier := opentracing.TextMapCarrier{}
if err := json.Unmarshal([]byte(js), &carrier); err != nil {
log.Printf("eventhorizon: could not unmarshal tracing span: %s", err)
return ctx
}
parentSpanContext, err := tracer.Extract(opentracing.TextMap, carrier)
if err != nil && err != opentracing.ErrSpanContextNotFound {
log.Printf("eventhorizon: could not extract tracing span: %s", err)
return ctx
}
span := tracer.StartSpan("eventbus", ext.RPCServerOption(parentSpanContext))
ctx = opentracing.ContextWithSpan(ctx, span)
}
return ctx
})
}
53 changes: 53 additions & 0 deletions eventbus/tracing/eventbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2020 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tracing

import (
"context"

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/middleware/eventhandler/tracing"
)

// EventBus is an event bus wrapper that adds tracing.
type EventBus struct {
eh.EventBus
h eh.EventHandler
}

// NewEventBus creates a EventBus.
func NewEventBus(eventBus eh.EventBus) *EventBus {
return &EventBus{
EventBus: eventBus,
// Wrap the eh.EventHandler part of the bus with tracing middleware,
// set as producer to set the correct tags.
h: eh.UseEventHandlerMiddleware(eventBus, tracing.NewMiddleware()),
}
}

// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {
return b.h.HandleEvent(ctx, event)
}

// AddHandler implements the AddHandler method of the eventhorizon.EventBus interface.
func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.EventHandler) error {
if h == nil {
return eh.ErrMissingHandler
}
// Wrap the handlers in tracing middleware.
h = eh.UseEventHandlerMiddleware(h, tracing.NewMiddleware())
return b.EventBus.AddHandler(ctx, m, h)
}
78 changes: 78 additions & 0 deletions eventbus/tracing/eventbus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2020 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tracing

import (
"testing"
"time"

"github.com/looplab/eventhorizon/eventbus"
"github.com/looplab/eventhorizon/eventbus/local"
)

func TestEventBus(t *testing.T) {
group := local.NewGroup()
if group == nil {
t.Fatal("there should be a group")
}
innerBus1 := local.NewEventBus(group)
if innerBus1 == nil {
t.Fatal("there should be a bus")
}
innerBus2 := local.NewEventBus(group)
if innerBus2 == nil {
t.Fatal("there should be a bus")
}

bus1 := NewEventBus(innerBus1)
if bus1 == nil {
t.Fatal("there should be a bus")
}

bus2 := NewEventBus(innerBus2)
if bus2 == nil {
t.Fatal("there should be a bus")
}

eventbus.AcceptanceTest(t, bus1, bus2, time.Second)
}

func TestEventBusLoad(t *testing.T) {
innerBus := local.NewEventBus(nil)
if innerBus == nil {
t.Fatal("there should be a bus")
}

bus := NewEventBus(innerBus)
if bus == nil {
t.Fatal("there should be a bus")
}

eventbus.LoadTest(t, bus)
}

func BenchmarkEventBus(b *testing.B) {
innerBus := local.NewEventBus(nil)
if innerBus == nil {
b.Fatal("there should be a bus")
}

bus := NewEventBus(innerBus)
if bus == nil {
b.Fatal("there should be a bus")
}

eventbus.Benchmark(b, bus)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package trace
package recorder

import (
"context"
Expand All @@ -21,12 +21,12 @@ import (
eh "github.com/looplab/eventhorizon"
)

// EventStore wraps an EventStore and adds debug tracing.
// EventStore wraps an EventStore and adds debug event recording.
type EventStore struct {
eh.EventStore
tracing bool
trace []eh.Event
traceMu sync.RWMutex
recording bool
record []eh.Event
recordMu sync.RWMutex
}

// NewEventStore creates a new EventStore.
Expand All @@ -37,7 +37,7 @@ func NewEventStore(eventStore eh.EventStore) *EventStore {

return &EventStore{
EventStore: eventStore,
trace: make([]eh.Event, 0),
record: make([]eh.Event, 0),
}
}

Expand All @@ -47,44 +47,44 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio
return err
}

// Only trace events that are successfully saved.
s.traceMu.Lock()
defer s.traceMu.Unlock()
if s.tracing {
s.trace = append(s.trace, events...)
// Only record events that are successfully saved.
s.recordMu.Lock()
defer s.recordMu.Unlock()
if s.recording {
s.record = append(s.record, events...)
}

return nil
}

// StartTracing starts the tracing of events.
func (s *EventStore) StartTracing() {
s.traceMu.Lock()
defer s.traceMu.Unlock()
// StartRecording starts recording of handled events.
func (s *EventStore) StartRecording() {
s.recordMu.Lock()
defer s.recordMu.Unlock()

s.tracing = true
s.recording = true
}

// StopTracing stops the tracing of events.
func (s *EventStore) StopTracing() {
s.traceMu.Lock()
defer s.traceMu.Unlock()
// StopRecording stops recording of handled events.
func (s *EventStore) StopRecording() {
s.recordMu.Lock()
defer s.recordMu.Unlock()

s.tracing = false
s.recording = false
}

// GetTrace returns the events that happened during the tracing.
func (s *EventStore) GetTrace() []eh.Event {
s.traceMu.RLock()
defer s.traceMu.RUnlock()
// GetRecord returns the events that happened during the recording.
func (s *EventStore) GetRecord() []eh.Event {
s.recordMu.RLock()
defer s.recordMu.RUnlock()

return s.trace
return s.record
}

// ResetTrace resets the trace.
// ResetTrace resets the record.
func (s *EventStore) ResetTrace() {
s.traceMu.Lock()
defer s.traceMu.Unlock()
s.recordMu.Lock()
defer s.recordMu.Unlock()

s.trace = make([]eh.Event, 0)
s.record = make([]eh.Event, 0)
}
Loading

0 comments on commit 82599db

Please sign in to comment.