From 054abff3b67bb5d66323e5418a43c845a3eac8a1 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Tue, 22 Nov 2016 21:55:27 -0500 Subject: [PATCH 1/2] Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff --- api/types/container/host_config.go | 11 + container/container.go | 21 +- daemon/logger/awslogs/cloudwatchlogs.go | 2 +- daemon/logger/copier.go | 19 +- daemon/logger/factory.go | 33 ++- daemon/logger/logger.go | 18 -- daemon/logger/logger_test.go | 26 --- daemon/logger/ring.go | 210 +++++++++++++++++ daemon/logger/ring_test.go | 299 ++++++++++++++++++++++++ daemon/logger/splunk/splunk_test.go | 40 ++-- docs/api/version-history.md | 2 + 11 files changed, 607 insertions(+), 74 deletions(-) delete mode 100644 daemon/logger/logger_test.go create mode 100644 daemon/logger/ring.go create mode 100644 daemon/logger/ring_test.go diff --git a/api/types/container/host_config.go b/api/types/container/host_config.go index d4a0c575f78c0..15a84b82d7a03 100644 --- a/api/types/container/host_config.go +++ b/api/types/container/host_config.go @@ -223,6 +223,17 @@ func (rp *RestartPolicy) IsSame(tp *RestartPolicy) bool { return rp.Name == tp.Name && rp.MaximumRetryCount == tp.MaximumRetryCount } +// LogMode is a type to define the available modes for logging +// These modes affect how logs are handled when log messages start piling up. +type LogMode string + +// Available logging modes +const ( + LogModeUnset = "" + LogModeBlocking LogMode = "blocking" + LogModeNonBlock LogMode = "non-blocking" +) + // LogConfig represents the logging configuration of the container. type LogConfig struct { Type string diff --git a/container/container.go b/container/container.go index 311953685ee46..9c9b07fe1d46f 100644 --- a/container/container.go +++ b/container/container.go @@ -37,6 +37,7 @@ import ( "github.com/docker/docker/runconfig" "github.com/docker/docker/volume" "github.com/docker/go-connections/nat" + "github.com/docker/go-units" "github.com/docker/libnetwork" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" @@ -316,7 +317,7 @@ func (container *Container) CheckpointDir() string { // StartLogger starts a new logger driver for the container. func (container *Container) StartLogger() (logger.Logger, error) { cfg := container.HostConfig.LogConfig - c, err := logger.GetLogDriver(cfg.Type) + initDriver, err := logger.GetLogDriver(cfg.Type) if err != nil { return nil, fmt.Errorf("failed to get logging factory: %v", err) } @@ -341,7 +342,23 @@ func (container *Container) StartLogger() (logger.Logger, error) { return nil, err } } - return c(info) + + l, err := initDriver(info) + if err != nil { + return nil, err + } + + if containertypes.LogMode(cfg.Config["mode"]) == containertypes.LogModeNonBlock { + bufferSize := int64(-1) + if s, exists := cfg.Config["max-buffer-size"]; exists { + bufferSize, err = units.RAMInBytes(s) + if err != nil { + return nil, err + } + } + l = logger.NewRingLogger(l, info, bufferSize) + } + return l, nil } // GetProcessLabel returns the process label for the container. diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 78995f3fad19f..e27b3809413d6 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -204,7 +204,7 @@ func (l *logStream) Log(msg *logger.Message) error { defer l.lock.RUnlock() if !l.closed { // buffer up the data, making sure to copy the Line data - l.messages <- logger.CopyMessage(msg) + l.messages <- msg } return nil } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index 10ab46e162b81..e8b06e58fecb5 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -47,7 +47,6 @@ func (c *Copier) copySrc(name string, src io.Reader) { buf := make([]byte, bufSize) n := 0 eof := false - msg := &Message{Source: name} for { select { @@ -78,13 +77,16 @@ func (c *Copier) copySrc(name string, src io.Reader) { // Break up the data that we've buffered up into lines, and log each in turn. p := 0 for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) { - msg.Line = buf[p : p+q] - msg.Timestamp = time.Now().UTC() - msg.Partial = false select { case <-c.closed: return default: + msg := &Message{ + Source: name, + Timestamp: time.Now().UTC(), + } + msg.Line = append(msg.Line, buf[p:p+q]...) + if logErr := c.dst.Log(msg); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) } @@ -96,9 +98,14 @@ func (c *Copier) copySrc(name string, src io.Reader) { // noting that it's a partial log line. if eof || (p == 0 && n == len(buf)) { if p < n { - msg.Line = buf[p:n] - msg.Timestamp = time.Now().UTC() + msg := &Message{ + Source: name, + Timestamp: time.Now().UTC(), + Partial: true, + } + msg.Line = append(msg.Line, buf[p:n]...) msg.Partial = true + if logErr := c.dst.Log(msg); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) } diff --git a/daemon/logger/factory.go b/daemon/logger/factory.go index 678e7cb223208..192d3e0df2bed 100644 --- a/daemon/logger/factory.go +++ b/daemon/logger/factory.go @@ -3,6 +3,10 @@ package logger import ( "fmt" "sync" + + containertypes "github.com/docker/docker/api/types/container" + units "github.com/docker/go-units" + "github.com/pkg/errors" ) // Creator builds a logging driver instance with given context. @@ -85,6 +89,11 @@ func GetLogDriver(name string) (Creator, error) { return factory.get(name) } +var builtInLogOpts = map[string]bool{ + "mode": true, + "max-buffer-size": true, +} + // ValidateLogOpts checks the options for the given log driver. The // options supported are specific to the LogDriver implementation. func ValidateLogOpts(name string, cfg map[string]string) error { @@ -92,13 +101,35 @@ func ValidateLogOpts(name string, cfg map[string]string) error { return nil } + switch containertypes.LogMode(cfg["mode"]) { + case containertypes.LogModeBlocking, containertypes.LogModeNonBlock, containertypes.LogModeUnset: + default: + return fmt.Errorf("logger: logging mode not supported: %s", cfg["mode"]) + } + + if s, ok := cfg["max-buffer-size"]; ok { + if containertypes.LogMode(cfg["mode"]) != containertypes.LogModeNonBlock { + return fmt.Errorf("logger: max-buffer-size option is only supported with 'mode=%s'", containertypes.LogModeNonBlock) + } + if _, err := units.RAMInBytes(s); err != nil { + return errors.Wrap(err, "error parsing option max-buffer-size") + } + } + if !factory.driverRegistered(name) { return fmt.Errorf("logger: no log driver named '%s' is registered", name) } + filteredOpts := make(map[string]string, len(builtInLogOpts)) + for k, v := range cfg { + if !builtInLogOpts[k] { + filteredOpts[k] = v + } + } + validator := factory.getLogOptValidator(name) if validator != nil { - return validator(cfg) + return validator(filteredOpts) } return nil } diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 672f57f81984f..dadd6f7cfc5dd 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -37,24 +37,6 @@ type Message struct { Partial bool } -// CopyMessage creates a copy of the passed-in Message which will remain -// unchanged if the original is changed. Log drivers which buffer Messages -// rather than dispatching them during their Log() method should use this -// function to obtain a Message whose Line member's contents won't change. -func CopyMessage(msg *Message) *Message { - m := new(Message) - m.Line = make([]byte, len(msg.Line)) - copy(m.Line, msg.Line) - m.Source = msg.Source - m.Timestamp = msg.Timestamp - m.Partial = msg.Partial - m.Attrs = make(LogAttributes) - for k, v := range msg.Attrs { - m.Attrs[k] = v - } - return m -} - // LogAttributes is used to hold the extra attributes available in the log message // Primarily used for converting the map type to string and sorting. type LogAttributes map[string]string diff --git a/daemon/logger/logger_test.go b/daemon/logger/logger_test.go deleted file mode 100644 index 16e1514d2d01b..0000000000000 --- a/daemon/logger/logger_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package logger - -import ( - "reflect" - "testing" - "time" -) - -func TestCopyMessage(t *testing.T) { - msg := &Message{ - Line: []byte("test line."), - Source: "stdout", - Timestamp: time.Now(), - Attrs: LogAttributes{ - "key1": "val1", - "key2": "val2", - "key3": "val3", - }, - Partial: true, - } - - m := CopyMessage(msg) - if !reflect.DeepEqual(m, msg) { - t.Fatalf("CopyMessage failed to copy message") - } -} diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go new file mode 100644 index 0000000000000..9748a558c5920 --- /dev/null +++ b/daemon/logger/ring.go @@ -0,0 +1,210 @@ +package logger + +import ( + "errors" + "sync" + "sync/atomic" + + "github.com/Sirupsen/logrus" +) + +const ( + defaultRingMaxSize = 1e6 // 1MB +) + +// RingLogger is a ring buffer that implements the Logger interface. +// This is used when lossy logging is OK. +type RingLogger struct { + buffer *messageRing + l Logger + logInfo Info + closeFlag int32 +} + +type ringWithReader struct { + *RingLogger +} + +func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher { + reader, ok := r.l.(LogReader) + if !ok { + // something is wrong if we get here + panic("expected log reader") + } + return reader.ReadLogs(cfg) +} + +func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger { + l := &RingLogger{ + buffer: newRing(maxSize), + l: driver, + logInfo: logInfo, + } + go l.run() + return l +} + +// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping +// the passed in logger. +func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger { + if maxSize < 0 { + maxSize = defaultRingMaxSize + } + l := newRingLogger(driver, logInfo, maxSize) + if _, ok := driver.(LogReader); ok { + return &ringWithReader{l} + } + return l +} + +// Log queues messages into the ring buffer +func (r *RingLogger) Log(msg *Message) error { + if r.closed() { + return errClosed + } + return r.buffer.Enqueue(msg) +} + +// Name returns the name of the underlying logger +func (r *RingLogger) Name() string { + return r.l.Name() +} + +func (r *RingLogger) closed() bool { + return atomic.LoadInt32(&r.closeFlag) == 1 +} + +func (r *RingLogger) setClosed() { + atomic.StoreInt32(&r.closeFlag, 1) +} + +// Close closes the logger +func (r *RingLogger) Close() error { + r.setClosed() + r.buffer.Close() + // empty out the queue + for _, msg := range r.buffer.Drain() { + if err := r.l.Log(msg); err != nil { + logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l) + break + } + } + return r.l.Close() +} + +// run consumes messages from the ring buffer and forwards them to the underling +// logger. +// This is run in a goroutine when the RingLogger is created +func (r *RingLogger) run() { + for { + if r.closed() { + return + } + msg, err := r.buffer.Dequeue() + if err != nil { + // buffer is closed + return + } + if err := r.l.Log(msg); err != nil { + logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l) + } + } +} + +type messageRing struct { + mu sync.Mutex + // singals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added + wait *sync.Cond + + sizeBytes int64 // current buffer size + maxBytes int64 // max buffer size size + queue []*Message + closed bool +} + +func newRing(maxBytes int64) *messageRing { + queueSize := 1000 + if maxBytes == 0 || maxBytes == 1 { + // With 0 or 1 max byte size, the maximum size of the queue would only ever be 1 + // message long. + queueSize = 1 + } + + r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes} + r.wait = sync.NewCond(&r.mu) + return r +} + +// Enqueue adds a message to the buffer queue +// If the message is too big for the buffer it drops the oldest messages to make room +// If there are no messages in the queue and the message is still too big, it adds the message anyway. +func (r *messageRing) Enqueue(m *Message) error { + mSize := int64(len(m.Line)) + + r.mu.Lock() + if r.closed { + r.mu.Unlock() + return errClosed + } + if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 { + r.wait.Signal() + r.mu.Unlock() + return nil + } + + r.queue = append(r.queue, m) + r.sizeBytes += mSize + r.wait.Signal() + r.mu.Unlock() + return nil +} + +// Dequeue pulls a message off the queue +// If there are no messages, it waits for one. +// If the buffer is closed, it will return immediately. +func (r *messageRing) Dequeue() (*Message, error) { + r.mu.Lock() + for len(r.queue) == 0 && !r.closed { + r.wait.Wait() + } + + if r.closed { + r.mu.Unlock() + return nil, errClosed + } + + msg := r.queue[0] + r.queue = r.queue[1:] + r.sizeBytes -= int64(len(msg.Line)) + r.mu.Unlock() + return msg, nil +} + +var errClosed = errors.New("closed") + +// Close closes the buffer ensuring no new messages can be added. +// Any callers waiting to dequeue a message will be woken up. +func (r *messageRing) Close() { + r.mu.Lock() + if r.closed { + r.mu.Unlock() + return + } + + r.closed = true + r.wait.Broadcast() + r.mu.Unlock() + return +} + +// Drain drains all messages from the queue. +// This can be used after `Close()` to get any remaining messages that were in queue. +func (r *messageRing) Drain() []*Message { + r.mu.Lock() + ls := make([]*Message, 0, len(r.queue)) + ls = append(ls, r.queue...) + r.sizeBytes = 0 + r.queue = r.queue[:0] + r.mu.Unlock() + return ls +} diff --git a/daemon/logger/ring_test.go b/daemon/logger/ring_test.go new file mode 100644 index 0000000000000..ed08cc4222590 --- /dev/null +++ b/daemon/logger/ring_test.go @@ -0,0 +1,299 @@ +package logger + +import ( + "context" + "strconv" + "testing" + "time" +) + +type mockLogger struct{ c chan *Message } + +func (l *mockLogger) Log(msg *Message) error { + l.c <- msg + return nil +} + +func (l *mockLogger) Name() string { + return "mock" +} + +func (l *mockLogger) Close() error { + return nil +} + +func TestRingLogger(t *testing.T) { + mockLog := &mockLogger{make(chan *Message)} // no buffer on this channel + ring := newRingLogger(mockLog, Info{}, 1) + defer ring.setClosed() + + // this should never block + ring.Log(&Message{Line: []byte("1")}) + ring.Log(&Message{Line: []byte("2")}) + ring.Log(&Message{Line: []byte("3")}) + + select { + case msg := <-mockLog.c: + if string(msg.Line) != "1" { + t.Fatalf("got unexpected msg: %q", string(msg.Line)) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout reading log message") + } + + select { + case msg := <-mockLog.c: + t.Fatalf("expected no more messages in the queue, got: %q", string(msg.Line)) + default: + } +} + +func TestRingCap(t *testing.T) { + r := newRing(5) + for i := 0; i < 10; i++ { + // queue messages with "0" to "10" + // the "5" to "10" messages should be dropped since we only allow 5 bytes in the buffer + if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil { + t.Fatal(err) + } + } + + // should have messages in the queue for "5" to "10" + for i := 0; i < 5; i++ { + m, err := r.Dequeue() + if err != nil { + t.Fatal(err) + } + if string(m.Line) != strconv.Itoa(i) { + t.Fatalf("got unexpected message for iter %d: %s", i, string(m.Line)) + } + } + + // queue a message that's bigger than the buffer cap + if err := r.Enqueue(&Message{Line: []byte("hello world")}); err != nil { + t.Fatal(err) + } + + // queue another message that's bigger than the buffer cap + if err := r.Enqueue(&Message{Line: []byte("eat a banana")}); err != nil { + t.Fatal(err) + } + + m, err := r.Dequeue() + if err != nil { + t.Fatal(err) + } + if string(m.Line) != "hello world" { + t.Fatalf("got unexpected message: %s", string(m.Line)) + } + if len(r.queue) != 0 { + t.Fatalf("expected queue to be empty, got: %d", len(r.queue)) + } +} + +func TestRingClose(t *testing.T) { + r := newRing(1) + if err := r.Enqueue(&Message{Line: []byte("hello")}); err != nil { + t.Fatal(err) + } + r.Close() + if err := r.Enqueue(&Message{}); err != errClosed { + t.Fatalf("expected errClosed, got: %v", err) + } + if len(r.queue) != 1 { + t.Fatal("expected empty queue") + } + if m, err := r.Dequeue(); err == nil || m != nil { + t.Fatal("exepcted err on Dequeue after close") + } + + ls := r.Drain() + if len(ls) != 1 { + t.Fatalf("expected one message: %v", ls) + } + if string(ls[0].Line) != "hello" { + t.Fatalf("got unexpected message: %s", string(ls[0].Line)) + } +} + +func TestRingDrain(t *testing.T) { + r := newRing(5) + for i := 0; i < 5; i++ { + if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil { + t.Fatal(err) + } + } + + ls := r.Drain() + if len(ls) != 5 { + t.Fatal("got unexpected length after drain") + } + + for i := 0; i < 5; i++ { + if string(ls[i].Line) != strconv.Itoa(i) { + t.Fatalf("got unexpected message at position %d: %s", i, string(ls[i].Line)) + } + } + if r.sizeBytes != 0 { + t.Fatalf("expected buffer size to be 0 after drain, got: %d", r.sizeBytes) + } + + ls = r.Drain() + if len(ls) != 0 { + t.Fatalf("expected 0 messages on 2nd drain: %v", ls) + } + +} + +type nopLogger struct{} + +func (nopLogger) Name() string { return "nopLogger" } +func (nopLogger) Close() error { return nil } +func (nopLogger) Log(*Message) error { return nil } + +func BenchmarkRingLoggerThroughputNoReceiver(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputWithReceiverDelay0(b *testing.B) { + l := NewRingLogger(nopLogger{}, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func consumeWithDelay(delay time.Duration, c <-chan *Message) (cancel func()) { + started := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + close(started) + ticker := time.NewTicker(delay) + for range ticker.C { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-c: + } + } + }() + <-started + return cancel +} + +func BenchmarkRingLoggerThroughputConsumeDelay1(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(1*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay10(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(10*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay50(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(50*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay100(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(100*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay300(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(300*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay500(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(500*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} diff --git a/daemon/logger/splunk/splunk_test.go b/daemon/logger/splunk/splunk_test.go index e7e3d68744bb8..90d50516270ed 100644 --- a/daemon/logger/splunk/splunk_test.go +++ b/daemon/logger/splunk/splunk_test.go @@ -133,11 +133,11 @@ func TestDefault(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notajson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -262,7 +262,7 @@ func TestInlineFormatWithNonDefaultOptions(t *testing.T) { } messageTime := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("1"), "stdout", messageTime, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil { t.Fatal(err) } @@ -361,11 +361,11 @@ func TestJsonFormat(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -478,11 +478,11 @@ func TestRawFormat(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -592,11 +592,11 @@ func TestRawFormatWithLabels(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -705,11 +705,11 @@ func TestRawFormatWithoutTag(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -790,7 +790,7 @@ func TestBatching(t *testing.T) { } for i := 0; i < defaultStreamChannelSize*4; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -856,7 +856,7 @@ func TestFrequency(t *testing.T) { } for i := 0; i < 10; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } time.Sleep(15 * time.Millisecond) @@ -937,7 +937,7 @@ func TestOneMessagePerRequest(t *testing.T) { } for i := 0; i < 10; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1045,7 +1045,7 @@ func TestSkipVerify(t *testing.T) { } for i := 0; i < defaultStreamChannelSize*2; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1057,7 +1057,7 @@ func TestSkipVerify(t *testing.T) { hec.simulateServerError = false for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1127,7 +1127,7 @@ func TestBufferMaximum(t *testing.T) { } for i := 0; i < 11; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1216,7 +1216,7 @@ func TestServerAlwaysDown(t *testing.T) { } for i := 0; i < 5; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1269,7 +1269,7 @@ func TestCannotSendAfterClose(t *testing.T) { t.Fatal(err) } - if err := loggerDriver.Log(&logger.Message{[]byte("message1"), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } @@ -1278,7 +1278,7 @@ func TestCannotSendAfterClose(t *testing.T) { t.Fatal(err) } - if err := loggerDriver.Log(&logger.Message{[]byte("message2"), "stdout", time.Now(), nil, false}); err == nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil { t.Fatal("Driver should not allow to send messages after close") } diff --git a/docs/api/version-history.md b/docs/api/version-history.md index b295eda91c0ac..e8cb046d0ccad 100644 --- a/docs/api/version-history.md +++ b/docs/api/version-history.md @@ -82,6 +82,8 @@ keywords: "API, Docker, rcli, REST, documentation" * `GET /secrets/{id}` returns information on the secret `id`. * `POST /secrets/{id}/update` updates the secret `id`. * `POST /services/(id or name)/update` now accepts service name or prefix of service id as a parameter. +* `POST /containers/create` added 2 built-in log-opts that work on all logging drivers, +`mode` (`blocking`|`non-blocking`), and `max-buffer-size` (e.g. `2m`) which enables a non-blocking log buffer. ## v1.24 API changes From 3f4fccb65f0ef286c9c4e0f01c4ae7bb09a6ad89 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 12 Dec 2016 09:54:20 -0500 Subject: [PATCH 2/2] Use sync.Pool for logger Messages This reduces allocs and bytes used per log entry significantly as well as some improvement to time per log operation. Each log driver, however, must put messages back in the pool once they are finished with the message. Signed-off-by: Brian Goff --- daemon/logger/awslogs/cloudwatchlogs.go | 2 +- daemon/logger/copier.go | 17 +++++++--------- daemon/logger/copier_test.go | 2 +- daemon/logger/etwlogs/etwlogs_windows.go | 4 +++- daemon/logger/fluentd/fluentd.go | 5 ++++- daemon/logger/gcplogs/gcplogging.go | 8 ++++++-- daemon/logger/gelf/gelf.go | 1 + daemon/logger/journald/journald.go | 8 ++++++-- daemon/logger/jsonfilelog/jsonfilelog.go | 1 + daemon/logger/logentries/logentries.go | 4 +++- daemon/logger/logger.go | 25 ++++++++++++++++++++++++ daemon/logger/ring.go | 10 +++++++++- daemon/logger/splunk/splunk.go | 6 +++--- daemon/logger/syslog/syslog.go | 6 ++++-- 14 files changed, 74 insertions(+), 25 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index e27b3809413d6..ba9455e6acc1a 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -203,7 +203,6 @@ func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() defer l.lock.RUnlock() if !l.closed { - // buffer up the data, making sure to copy the Line data l.messages <- msg } return nil @@ -347,6 +346,7 @@ func (l *logStream) collectBatch() { }) bytes += (lineBytes + perEventBytes) } + logger.PutMessage(msg) } } } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index e8b06e58fecb5..65d8fb148e757 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -76,15 +76,14 @@ func (c *Copier) copySrc(name string, src io.Reader) { } // Break up the data that we've buffered up into lines, and log each in turn. p := 0 - for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) { + for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') { select { case <-c.closed: return default: - msg := &Message{ - Source: name, - Timestamp: time.Now().UTC(), - } + msg := NewMessage() + msg.Source = name + msg.Timestamp = time.Now().UTC() msg.Line = append(msg.Line, buf[p:p+q]...) if logErr := c.dst.Log(msg); logErr != nil { @@ -98,11 +97,9 @@ func (c *Copier) copySrc(name string, src io.Reader) { // noting that it's a partial log line. if eof || (p == 0 && n == len(buf)) { if p < n { - msg := &Message{ - Source: name, - Timestamp: time.Now().UTC(), - Partial: true, - } + msg := NewMessage() + msg.Source = name + msg.Timestamp = time.Now().UTC() msg.Line = append(msg.Line, buf[p:n]...) msg.Partial = true diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index cfd816a6ebc90..e6975e2d840c8 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -208,7 +208,7 @@ func TestCopierSlow(t *testing.T) { type BenchmarkLoggerDummy struct { } -func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil } +func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil } func (l *BenchmarkLoggerDummy) Close() error { return nil } diff --git a/daemon/logger/etwlogs/etwlogs_windows.go b/daemon/logger/etwlogs/etwlogs_windows.go index 093387452587b..9b082fe7376d3 100644 --- a/daemon/logger/etwlogs/etwlogs_windows.go +++ b/daemon/logger/etwlogs/etwlogs_windows.go @@ -76,7 +76,9 @@ func (etwLogger *etwLogs) Log(msg *logger.Message) error { logrus.Error(errorMessage) return errors.New(errorMessage) } - return callEventWriteString(createLogMessage(etwLogger, msg)) + m := createLogMessage(etwLogger, msg) + logger.PutMessage(msg) + return callEventWriteString(m) } // Close closes the logger by unregistering the ETW provider. diff --git a/daemon/logger/fluentd/fluentd.go b/daemon/logger/fluentd/fluentd.go index 8c0da26e76dca..4af3e0e6fa64a 100644 --- a/daemon/logger/fluentd/fluentd.go +++ b/daemon/logger/fluentd/fluentd.go @@ -151,9 +151,12 @@ func (f *fluentd) Log(msg *logger.Message) error { for k, v := range f.extra { data[k] = v } + + ts := msg.Timestamp + logger.PutMessage(msg) // fluent-logger-golang buffers logs from failures and disconnections, // and these are transferred again automatically. - return f.writer.PostWithTime(f.tag, msg.Timestamp, data) + return f.writer.PostWithTime(f.tag, ts, data) } func (f *fluentd) Close() error { diff --git a/daemon/logger/gcplogs/gcplogging.go b/daemon/logger/gcplogs/gcplogging.go index ff1cb39c30a59..35766999338a0 100644 --- a/daemon/logger/gcplogs/gcplogging.go +++ b/daemon/logger/gcplogs/gcplogging.go @@ -194,12 +194,16 @@ func ValidateLogOpts(cfg map[string]string) error { } func (l *gcplogs) Log(m *logger.Message) error { + data := string(m.Line) + ts := m.Timestamp + logger.PutMessage(m) + l.logger.Log(logging.Entry{ - Timestamp: m.Timestamp, + Timestamp: ts, Payload: &dockerLogEntry{ Instance: l.instance, Container: l.container, - Data: string(m.Line), + Data: data, }, }) return nil diff --git a/daemon/logger/gelf/gelf.go b/daemon/logger/gelf/gelf.go index 42b957049570d..b771cab501de0 100644 --- a/daemon/logger/gelf/gelf.go +++ b/daemon/logger/gelf/gelf.go @@ -133,6 +133,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error { Level: level, RawExtra: s.rawExtra, } + logger.PutMessage(msg) if err := s.writer.WriteMessage(&m); err != nil { return fmt.Errorf("gelf: cannot send GELF message: %v", err) diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 0a16aafd94fd3..712d301c68a7d 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -105,10 +105,14 @@ func (s *journald) Log(msg *logger.Message) error { if msg.Partial { vars["CONTAINER_PARTIAL_MESSAGE"] = "true" } + + line := string(msg.Line) + logger.PutMessage(msg) + if msg.Source == "stderr" { - return journal.Send(string(msg.Line), journal.PriErr, vars) + return journal.Send(line, journal.PriErr, vars) } - return journal.Send(string(msg.Line), journal.PriInfo, vars) + return journal.Send(line, journal.PriInfo, vars) } func (s *journald) Name() string { diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index eb25e419aff25..d804ed28f46d6 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -100,6 +100,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { Created: timestamp, RawAttrs: l.extra, }).MarshalJSONBuf(l.buf) + logger.PutMessage(msg) if err != nil { l.mu.Unlock() return err diff --git a/daemon/logger/logentries/logentries.go b/daemon/logger/logentries/logentries.go index 114ddd59d4354..64d6893716b87 100644 --- a/daemon/logger/logentries/logentries.go +++ b/daemon/logger/logentries/logentries.go @@ -61,7 +61,9 @@ func (f *logentries) Log(msg *logger.Message) error { for k, v := range f.extra { data[k] = v } - f.writer.Println(f.tag, msg.Timestamp, data) + ts := msg.Timestamp + logger.PutMessage(msg) + f.writer.Println(f.tag, ts, data) return nil } diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index dadd6f7cfc5dd..7172663aa0282 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -26,9 +26,24 @@ const ( logWatcherBufferSize = 4096 ) +var messagePool = &sync.Pool{New: func() interface{} { return &Message{Line: make([]byte, 0, 256)} }} + +// NewMessage returns a new message from the message sync.Pool +func NewMessage() *Message { + return messagePool.Get().(*Message) +} + +// PutMessage puts the specified message back n the message pool. +// The message fields are reset before putting into the pool. +func PutMessage(msg *Message) { + msg.reset() + messagePool.Put(msg) +} + // Message is datastructure that represents piece of output produced by some // container. The Line member is a slice of an array whose contents can be // changed after a log driver's Log() method returns. +// Any changes made to this struct must also be updated in the `reset` function type Message struct { Line []byte Source string @@ -37,6 +52,16 @@ type Message struct { Partial bool } +// reset sets the message back to default values +// This is used when putting a message back into the message pool. +// Any changes to the `Message` struct should be reflected here. +func (m *Message) reset() { + m.Line = m.Line[:0] + m.Source = "" + m.Attrs = nil + m.Partial = false +} + // LogAttributes is used to hold the extra attributes available in the log message // Primarily used for converting the map type to string and sorting. type LogAttributes map[string]string diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go index 9748a558c5920..90769d71e1839 100644 --- a/daemon/logger/ring.go +++ b/daemon/logger/ring.go @@ -83,10 +83,18 @@ func (r *RingLogger) Close() error { r.setClosed() r.buffer.Close() // empty out the queue + var logErr bool for _, msg := range r.buffer.Drain() { + if logErr { + // some error logging a previous message, so re-insert to message pool + // and assume log driver is hosed + PutMessage(msg) + continue + } + if err := r.l.Log(msg); err != nil { logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l) - break + logErr = true } } return r.l.Close() diff --git a/daemon/logger/splunk/splunk.go b/daemon/logger/splunk/splunk.go index 3ae6da71b3415..5dcd508786a55 100644 --- a/daemon/logger/splunk/splunk.go +++ b/daemon/logger/splunk/splunk.go @@ -336,7 +336,7 @@ func (l *splunkLoggerInline) Log(msg *logger.Message) error { event.Source = msg.Source message.Event = &event - + logger.PutMessage(msg) return l.queueMessageAsync(message) } @@ -354,7 +354,7 @@ func (l *splunkLoggerJSON) Log(msg *logger.Message) error { event.Source = msg.Source message.Event = &event - + logger.PutMessage(msg) return l.queueMessageAsync(message) } @@ -362,7 +362,7 @@ func (l *splunkLoggerRaw) Log(msg *logger.Message) error { message := l.createSplunkMessage(msg) message.Event = string(append(l.prefix, msg.Line...)) - + logger.PutMessage(msg) return l.queueMessageAsync(message) } diff --git a/daemon/logger/syslog/syslog.go b/daemon/logger/syslog/syslog.go index f765b7d9763de..1272b8a647c38 100644 --- a/daemon/logger/syslog/syslog.go +++ b/daemon/logger/syslog/syslog.go @@ -132,10 +132,12 @@ func New(info logger.Info) (logger.Logger, error) { } func (s *syslogger) Log(msg *logger.Message) error { + line := string(msg.Line) + logger.PutMessage(msg) if msg.Source == "stderr" { - return s.writer.Err(string(msg.Line)) + return s.writer.Err(line) } - return s.writer.Info(string(msg.Line)) + return s.writer.Info(line) } func (s *syslogger) Close() error {