Skip to content

Commit

Permalink
Add support for asynchronous logging (#27)
Browse files Browse the repository at this point in the history
* Add Async-mode constructors

* Move sending of raven packet to its own func

* Set default timeout to 1s in async mode

* Add async plumbing

* Add async tests

* Add Flush() function
  • Loading branch information
flimzy authored and evalphobia committed Dec 14, 2016
1 parent f1b8721 commit d5bbafe
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 0 deletions.
70 changes: 70 additions & 0 deletions async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package logrus_sentry

import (
"net/http"
"sync"
"testing"
"time"

"github.com/Sirupsen/logrus"
)

func TestParallelLogging(t *testing.T) {
WithTestDSN(t, func(dsn string, pch <-chan *resultPacket) {
logger := getTestLogger()

hook, err := NewAsyncSentryHook(dsn, []logrus.Level{
logrus.ErrorLevel,
})

if err != nil {
t.Fatal(err.Error())
}
logger.Hooks.Add(hook)

wg := &sync.WaitGroup{}

// start draining messages
var logsReceived int
const logCount = 10
go func() {
for i := 0; i < logCount; i++ {
timeoutCh := time.After(hook.Timeout * 2)
var packet *resultPacket
select {
case packet = <-pch:
case <-timeoutCh:
t.Fatalf("Waited %s without a response", hook.Timeout*2)
}
if packet.Logger != logger_name {
t.Errorf("logger should have been %s, was %s", logger_name, packet.Logger)
}

if packet.ServerName != server_name {
t.Errorf("server_name should have been %s, was %s", server_name, packet.ServerName)
}
logsReceived++
wg.Done()
}
}()

req, _ := http.NewRequest("GET", "url", nil)
log := logger.WithFields(logrus.Fields{
"server_name": server_name,
"logger": logger_name,
"http_request": req,
})

for i := 0; i < logCount; i++ {
wg.Add(1)
go func() {
log.Error(message)
}()
}

wg.Wait()
if logCount != logsReceived {
t.Errorf("Sent %d logs, received %d", logCount, logsReceived)
}
})
}
76 changes: 76 additions & 0 deletions sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"runtime"
"sync"
"time"

"github.com/Sirupsen/logrus"
Expand All @@ -22,6 +23,11 @@ var (
}
)

// BufSize controls the number of logs that can be in progress before logging
// will start blocking. Set logrus_sentry.BufSize = <value> _before_ calling
// NewAsync*().
var BufSize uint = 8192

// SentryHook delivers logs to a sentry server.
type SentryHook struct {
// Timeout sets the time to wait for a delivery error from the sentry server.
Expand All @@ -35,6 +41,11 @@ type SentryHook struct {

ignoreFields map[string]struct{}
extraFilters map[string]func(interface{}) interface{}

asynchronous bool
buf chan *raven.Packet
wg sync.WaitGroup
mu sync.RWMutex
}

// The Stacktracer interface allows an error type to return a raven.Stacktrace.
Expand Down Expand Up @@ -107,11 +118,45 @@ func NewWithClientSentryHook(client *raven.Client, levels []logrus.Level) (*Sent
}, nil
}

// NewAsyncSentryHook creates a hook same as NewSentryHook, but in asynchronous
// mode. This method sets the timeout to 1000 milliseconds.
func NewAsyncSentryHook(DSN string, levels []logrus.Level) (*SentryHook, error) {
hook, err := NewSentryHook(DSN, levels)
return setAsync(hook), err
}

// NewAsyncWithTagsSentryHook creates a hook same as NewWithTagsSentryHook, but
// in asynchronous mode. This method sets the timeout to 1000 milliseconds.
func NewAsyncWithTagsSentryHook(DSN string, tags map[string]string, levels []logrus.Level) (*SentryHook, error) {
hook, err := NewWithTagsSentryHook(DSN, tags, levels)
return setAsync(hook), err
}

// NewAsyncWithClientSentryHook creates a hook same as NewWithClientSentryHook,
// but in asynchronous mode. This method sets the timeout to 1000 milliseconds.
func NewAsyncWithClientSentryHook(client *raven.Client, levels []logrus.Level) (*SentryHook, error) {
hook, err := NewWithClientSentryHook(client, levels)
return setAsync(hook), err
}

func setAsync(hook *SentryHook) *SentryHook {
if hook == nil {
return nil
}
hook.Timeout = 1 * time.Second
hook.asynchronous = true
hook.buf = make(chan *raven.Packet, BufSize)
go hook.fire() // Log in background
return hook
}

// Fire is called when an event should be sent to sentry
// Special fields that sentry uses to give more information to the server
// are extracted from entry.Data (if they are found)
// These fields are: error, logger, server_name, http_request, tags
func (hook *SentryHook) Fire(entry *logrus.Entry) error {
hook.mu.RLock() // Allow multiple go routines to log simultaneously
defer hook.mu.RUnlock()
packet := raven.NewPacket(entry.Message)
packet.Timestamp = raven.Timestamp(entry.Time)
packet.Level = severityMap[entry.Level]
Expand Down Expand Up @@ -167,6 +212,37 @@ func (hook *SentryHook) Fire(entry *logrus.Entry) error {
}
}

if hook.asynchronous {
hook.wg.Add(1)
hook.buf <- packet
return nil
}
return hook.sendPacket(packet)
}

func (hook *SentryHook) fire() {
for {
packet := <-hook.buf
if err := hook.sendPacket(packet); err != nil {
fmt.Println(err)
}
hook.wg.Done()
}
}

// Flush waits for the log queue to empty. This function only does anything in
// asynchronous mode.
func (hook *SentryHook) Flush() {
if !hook.asynchronous {
return
}
hook.mu.Lock() // Claim exclusive access; any logging goroutines will block until the flush completes
defer hook.mu.Unlock()

hook.wg.Wait()
}

func (hook *SentryHook) sendPacket(packet *raven.Packet) error {
_, errCh := hook.client.Capture(packet, nil)
timeout := hook.Timeout
if timeout != 0 {
Expand Down

0 comments on commit d5bbafe

Please sign in to comment.