diff --git a/api/types/container/host_config.go b/api/types/container/host_config.go index d4a0c575f78c0..15a84b82d7a03 100644 --- a/api/types/container/host_config.go +++ b/api/types/container/host_config.go @@ -223,6 +223,17 @@ func (rp *RestartPolicy) IsSame(tp *RestartPolicy) bool { return rp.Name == tp.Name && rp.MaximumRetryCount == tp.MaximumRetryCount } +// LogMode is a type to define the available modes for logging +// These modes affect how logs are handled when log messages start piling up. +type LogMode string + +// Available logging modes +const ( + LogModeUnset = "" + LogModeBlocking LogMode = "blocking" + LogModeNonBlock LogMode = "non-blocking" +) + // LogConfig represents the logging configuration of the container. type LogConfig struct { Type string diff --git a/container/container.go b/container/container.go index 311953685ee46..9c9b07fe1d46f 100644 --- a/container/container.go +++ b/container/container.go @@ -37,6 +37,7 @@ import ( "github.com/docker/docker/runconfig" "github.com/docker/docker/volume" "github.com/docker/go-connections/nat" + "github.com/docker/go-units" "github.com/docker/libnetwork" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" @@ -316,7 +317,7 @@ func (container *Container) CheckpointDir() string { // StartLogger starts a new logger driver for the container. func (container *Container) StartLogger() (logger.Logger, error) { cfg := container.HostConfig.LogConfig - c, err := logger.GetLogDriver(cfg.Type) + initDriver, err := logger.GetLogDriver(cfg.Type) if err != nil { return nil, fmt.Errorf("failed to get logging factory: %v", err) } @@ -341,7 +342,23 @@ func (container *Container) StartLogger() (logger.Logger, error) { return nil, err } } - return c(info) + + l, err := initDriver(info) + if err != nil { + return nil, err + } + + if containertypes.LogMode(cfg.Config["mode"]) == containertypes.LogModeNonBlock { + bufferSize := int64(-1) + if s, exists := cfg.Config["max-buffer-size"]; exists { + bufferSize, err = units.RAMInBytes(s) + if err != nil { + return nil, err + } + } + l = logger.NewRingLogger(l, info, bufferSize) + } + return l, nil } // GetProcessLabel returns the process label for the container. diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 78995f3fad19f..ba9455e6acc1a 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -203,8 +203,7 @@ func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() defer l.lock.RUnlock() if !l.closed { - // buffer up the data, making sure to copy the Line data - l.messages <- logger.CopyMessage(msg) + l.messages <- msg } return nil } @@ -347,6 +346,7 @@ func (l *logStream) collectBatch() { }) bytes += (lineBytes + perEventBytes) } + logger.PutMessage(msg) } } } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index 10ab46e162b81..65d8fb148e757 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -47,7 +47,6 @@ func (c *Copier) copySrc(name string, src io.Reader) { buf := make([]byte, bufSize) n := 0 eof := false - msg := &Message{Source: name} for { select { @@ -77,14 +76,16 @@ func (c *Copier) copySrc(name string, src io.Reader) { } // Break up the data that we've buffered up into lines, and log each in turn. p := 0 - for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) { - msg.Line = buf[p : p+q] - msg.Timestamp = time.Now().UTC() - msg.Partial = false + for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') { select { case <-c.closed: return default: + msg := NewMessage() + msg.Source = name + msg.Timestamp = time.Now().UTC() + msg.Line = append(msg.Line, buf[p:p+q]...) + if logErr := c.dst.Log(msg); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) } @@ -96,9 +97,12 @@ func (c *Copier) copySrc(name string, src io.Reader) { // noting that it's a partial log line. if eof || (p == 0 && n == len(buf)) { if p < n { - msg.Line = buf[p:n] + msg := NewMessage() + msg.Source = name msg.Timestamp = time.Now().UTC() + msg.Line = append(msg.Line, buf[p:n]...) msg.Partial = true + if logErr := c.dst.Log(msg); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) } diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index cfd816a6ebc90..e6975e2d840c8 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -208,7 +208,7 @@ func TestCopierSlow(t *testing.T) { type BenchmarkLoggerDummy struct { } -func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil } +func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil } func (l *BenchmarkLoggerDummy) Close() error { return nil } diff --git a/daemon/logger/etwlogs/etwlogs_windows.go b/daemon/logger/etwlogs/etwlogs_windows.go index 093387452587b..9b082fe7376d3 100644 --- a/daemon/logger/etwlogs/etwlogs_windows.go +++ b/daemon/logger/etwlogs/etwlogs_windows.go @@ -76,7 +76,9 @@ func (etwLogger *etwLogs) Log(msg *logger.Message) error { logrus.Error(errorMessage) return errors.New(errorMessage) } - return callEventWriteString(createLogMessage(etwLogger, msg)) + m := createLogMessage(etwLogger, msg) + logger.PutMessage(msg) + return callEventWriteString(m) } // Close closes the logger by unregistering the ETW provider. diff --git a/daemon/logger/factory.go b/daemon/logger/factory.go index 678e7cb223208..192d3e0df2bed 100644 --- a/daemon/logger/factory.go +++ b/daemon/logger/factory.go @@ -3,6 +3,10 @@ package logger import ( "fmt" "sync" + + containertypes "github.com/docker/docker/api/types/container" + units "github.com/docker/go-units" + "github.com/pkg/errors" ) // Creator builds a logging driver instance with given context. @@ -85,6 +89,11 @@ func GetLogDriver(name string) (Creator, error) { return factory.get(name) } +var builtInLogOpts = map[string]bool{ + "mode": true, + "max-buffer-size": true, +} + // ValidateLogOpts checks the options for the given log driver. The // options supported are specific to the LogDriver implementation. func ValidateLogOpts(name string, cfg map[string]string) error { @@ -92,13 +101,35 @@ func ValidateLogOpts(name string, cfg map[string]string) error { return nil } + switch containertypes.LogMode(cfg["mode"]) { + case containertypes.LogModeBlocking, containertypes.LogModeNonBlock, containertypes.LogModeUnset: + default: + return fmt.Errorf("logger: logging mode not supported: %s", cfg["mode"]) + } + + if s, ok := cfg["max-buffer-size"]; ok { + if containertypes.LogMode(cfg["mode"]) != containertypes.LogModeNonBlock { + return fmt.Errorf("logger: max-buffer-size option is only supported with 'mode=%s'", containertypes.LogModeNonBlock) + } + if _, err := units.RAMInBytes(s); err != nil { + return errors.Wrap(err, "error parsing option max-buffer-size") + } + } + if !factory.driverRegistered(name) { return fmt.Errorf("logger: no log driver named '%s' is registered", name) } + filteredOpts := make(map[string]string, len(builtInLogOpts)) + for k, v := range cfg { + if !builtInLogOpts[k] { + filteredOpts[k] = v + } + } + validator := factory.getLogOptValidator(name) if validator != nil { - return validator(cfg) + return validator(filteredOpts) } return nil } diff --git a/daemon/logger/fluentd/fluentd.go b/daemon/logger/fluentd/fluentd.go index 8c0da26e76dca..4af3e0e6fa64a 100644 --- a/daemon/logger/fluentd/fluentd.go +++ b/daemon/logger/fluentd/fluentd.go @@ -151,9 +151,12 @@ func (f *fluentd) Log(msg *logger.Message) error { for k, v := range f.extra { data[k] = v } + + ts := msg.Timestamp + logger.PutMessage(msg) // fluent-logger-golang buffers logs from failures and disconnections, // and these are transferred again automatically. - return f.writer.PostWithTime(f.tag, msg.Timestamp, data) + return f.writer.PostWithTime(f.tag, ts, data) } func (f *fluentd) Close() error { diff --git a/daemon/logger/gcplogs/gcplogging.go b/daemon/logger/gcplogs/gcplogging.go index ff1cb39c30a59..35766999338a0 100644 --- a/daemon/logger/gcplogs/gcplogging.go +++ b/daemon/logger/gcplogs/gcplogging.go @@ -194,12 +194,16 @@ func ValidateLogOpts(cfg map[string]string) error { } func (l *gcplogs) Log(m *logger.Message) error { + data := string(m.Line) + ts := m.Timestamp + logger.PutMessage(m) + l.logger.Log(logging.Entry{ - Timestamp: m.Timestamp, + Timestamp: ts, Payload: &dockerLogEntry{ Instance: l.instance, Container: l.container, - Data: string(m.Line), + Data: data, }, }) return nil diff --git a/daemon/logger/gelf/gelf.go b/daemon/logger/gelf/gelf.go index 42b957049570d..b771cab501de0 100644 --- a/daemon/logger/gelf/gelf.go +++ b/daemon/logger/gelf/gelf.go @@ -133,6 +133,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error { Level: level, RawExtra: s.rawExtra, } + logger.PutMessage(msg) if err := s.writer.WriteMessage(&m); err != nil { return fmt.Errorf("gelf: cannot send GELF message: %v", err) diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 0a16aafd94fd3..712d301c68a7d 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -105,10 +105,14 @@ func (s *journald) Log(msg *logger.Message) error { if msg.Partial { vars["CONTAINER_PARTIAL_MESSAGE"] = "true" } + + line := string(msg.Line) + logger.PutMessage(msg) + if msg.Source == "stderr" { - return journal.Send(string(msg.Line), journal.PriErr, vars) + return journal.Send(line, journal.PriErr, vars) } - return journal.Send(string(msg.Line), journal.PriInfo, vars) + return journal.Send(line, journal.PriInfo, vars) } func (s *journald) Name() string { diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index eb25e419aff25..d804ed28f46d6 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -100,6 +100,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { Created: timestamp, RawAttrs: l.extra, }).MarshalJSONBuf(l.buf) + logger.PutMessage(msg) if err != nil { l.mu.Unlock() return err diff --git a/daemon/logger/logentries/logentries.go b/daemon/logger/logentries/logentries.go index 114ddd59d4354..64d6893716b87 100644 --- a/daemon/logger/logentries/logentries.go +++ b/daemon/logger/logentries/logentries.go @@ -61,7 +61,9 @@ func (f *logentries) Log(msg *logger.Message) error { for k, v := range f.extra { data[k] = v } - f.writer.Println(f.tag, msg.Timestamp, data) + ts := msg.Timestamp + logger.PutMessage(msg) + f.writer.Println(f.tag, ts, data) return nil } diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 672f57f81984f..7172663aa0282 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -26,9 +26,24 @@ const ( logWatcherBufferSize = 4096 ) +var messagePool = &sync.Pool{New: func() interface{} { return &Message{Line: make([]byte, 0, 256)} }} + +// NewMessage returns a new message from the message sync.Pool +func NewMessage() *Message { + return messagePool.Get().(*Message) +} + +// PutMessage puts the specified message back n the message pool. +// The message fields are reset before putting into the pool. +func PutMessage(msg *Message) { + msg.reset() + messagePool.Put(msg) +} + // Message is datastructure that represents piece of output produced by some // container. The Line member is a slice of an array whose contents can be // changed after a log driver's Log() method returns. +// Any changes made to this struct must also be updated in the `reset` function type Message struct { Line []byte Source string @@ -37,22 +52,14 @@ type Message struct { Partial bool } -// CopyMessage creates a copy of the passed-in Message which will remain -// unchanged if the original is changed. Log drivers which buffer Messages -// rather than dispatching them during their Log() method should use this -// function to obtain a Message whose Line member's contents won't change. -func CopyMessage(msg *Message) *Message { - m := new(Message) - m.Line = make([]byte, len(msg.Line)) - copy(m.Line, msg.Line) - m.Source = msg.Source - m.Timestamp = msg.Timestamp - m.Partial = msg.Partial - m.Attrs = make(LogAttributes) - for k, v := range msg.Attrs { - m.Attrs[k] = v - } - return m +// reset sets the message back to default values +// This is used when putting a message back into the message pool. +// Any changes to the `Message` struct should be reflected here. +func (m *Message) reset() { + m.Line = m.Line[:0] + m.Source = "" + m.Attrs = nil + m.Partial = false } // LogAttributes is used to hold the extra attributes available in the log message diff --git a/daemon/logger/logger_test.go b/daemon/logger/logger_test.go deleted file mode 100644 index 16e1514d2d01b..0000000000000 --- a/daemon/logger/logger_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package logger - -import ( - "reflect" - "testing" - "time" -) - -func TestCopyMessage(t *testing.T) { - msg := &Message{ - Line: []byte("test line."), - Source: "stdout", - Timestamp: time.Now(), - Attrs: LogAttributes{ - "key1": "val1", - "key2": "val2", - "key3": "val3", - }, - Partial: true, - } - - m := CopyMessage(msg) - if !reflect.DeepEqual(m, msg) { - t.Fatalf("CopyMessage failed to copy message") - } -} diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go new file mode 100644 index 0000000000000..90769d71e1839 --- /dev/null +++ b/daemon/logger/ring.go @@ -0,0 +1,218 @@ +package logger + +import ( + "errors" + "sync" + "sync/atomic" + + "github.com/Sirupsen/logrus" +) + +const ( + defaultRingMaxSize = 1e6 // 1MB +) + +// RingLogger is a ring buffer that implements the Logger interface. +// This is used when lossy logging is OK. +type RingLogger struct { + buffer *messageRing + l Logger + logInfo Info + closeFlag int32 +} + +type ringWithReader struct { + *RingLogger +} + +func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher { + reader, ok := r.l.(LogReader) + if !ok { + // something is wrong if we get here + panic("expected log reader") + } + return reader.ReadLogs(cfg) +} + +func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger { + l := &RingLogger{ + buffer: newRing(maxSize), + l: driver, + logInfo: logInfo, + } + go l.run() + return l +} + +// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping +// the passed in logger. +func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger { + if maxSize < 0 { + maxSize = defaultRingMaxSize + } + l := newRingLogger(driver, logInfo, maxSize) + if _, ok := driver.(LogReader); ok { + return &ringWithReader{l} + } + return l +} + +// Log queues messages into the ring buffer +func (r *RingLogger) Log(msg *Message) error { + if r.closed() { + return errClosed + } + return r.buffer.Enqueue(msg) +} + +// Name returns the name of the underlying logger +func (r *RingLogger) Name() string { + return r.l.Name() +} + +func (r *RingLogger) closed() bool { + return atomic.LoadInt32(&r.closeFlag) == 1 +} + +func (r *RingLogger) setClosed() { + atomic.StoreInt32(&r.closeFlag, 1) +} + +// Close closes the logger +func (r *RingLogger) Close() error { + r.setClosed() + r.buffer.Close() + // empty out the queue + var logErr bool + for _, msg := range r.buffer.Drain() { + if logErr { + // some error logging a previous message, so re-insert to message pool + // and assume log driver is hosed + PutMessage(msg) + continue + } + + if err := r.l.Log(msg); err != nil { + logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l) + logErr = true + } + } + return r.l.Close() +} + +// run consumes messages from the ring buffer and forwards them to the underling +// logger. +// This is run in a goroutine when the RingLogger is created +func (r *RingLogger) run() { + for { + if r.closed() { + return + } + msg, err := r.buffer.Dequeue() + if err != nil { + // buffer is closed + return + } + if err := r.l.Log(msg); err != nil { + logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l) + } + } +} + +type messageRing struct { + mu sync.Mutex + // singals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added + wait *sync.Cond + + sizeBytes int64 // current buffer size + maxBytes int64 // max buffer size size + queue []*Message + closed bool +} + +func newRing(maxBytes int64) *messageRing { + queueSize := 1000 + if maxBytes == 0 || maxBytes == 1 { + // With 0 or 1 max byte size, the maximum size of the queue would only ever be 1 + // message long. + queueSize = 1 + } + + r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes} + r.wait = sync.NewCond(&r.mu) + return r +} + +// Enqueue adds a message to the buffer queue +// If the message is too big for the buffer it drops the oldest messages to make room +// If there are no messages in the queue and the message is still too big, it adds the message anyway. +func (r *messageRing) Enqueue(m *Message) error { + mSize := int64(len(m.Line)) + + r.mu.Lock() + if r.closed { + r.mu.Unlock() + return errClosed + } + if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 { + r.wait.Signal() + r.mu.Unlock() + return nil + } + + r.queue = append(r.queue, m) + r.sizeBytes += mSize + r.wait.Signal() + r.mu.Unlock() + return nil +} + +// Dequeue pulls a message off the queue +// If there are no messages, it waits for one. +// If the buffer is closed, it will return immediately. +func (r *messageRing) Dequeue() (*Message, error) { + r.mu.Lock() + for len(r.queue) == 0 && !r.closed { + r.wait.Wait() + } + + if r.closed { + r.mu.Unlock() + return nil, errClosed + } + + msg := r.queue[0] + r.queue = r.queue[1:] + r.sizeBytes -= int64(len(msg.Line)) + r.mu.Unlock() + return msg, nil +} + +var errClosed = errors.New("closed") + +// Close closes the buffer ensuring no new messages can be added. +// Any callers waiting to dequeue a message will be woken up. +func (r *messageRing) Close() { + r.mu.Lock() + if r.closed { + r.mu.Unlock() + return + } + + r.closed = true + r.wait.Broadcast() + r.mu.Unlock() + return +} + +// Drain drains all messages from the queue. +// This can be used after `Close()` to get any remaining messages that were in queue. +func (r *messageRing) Drain() []*Message { + r.mu.Lock() + ls := make([]*Message, 0, len(r.queue)) + ls = append(ls, r.queue...) + r.sizeBytes = 0 + r.queue = r.queue[:0] + r.mu.Unlock() + return ls +} diff --git a/daemon/logger/ring_test.go b/daemon/logger/ring_test.go new file mode 100644 index 0000000000000..ed08cc4222590 --- /dev/null +++ b/daemon/logger/ring_test.go @@ -0,0 +1,299 @@ +package logger + +import ( + "context" + "strconv" + "testing" + "time" +) + +type mockLogger struct{ c chan *Message } + +func (l *mockLogger) Log(msg *Message) error { + l.c <- msg + return nil +} + +func (l *mockLogger) Name() string { + return "mock" +} + +func (l *mockLogger) Close() error { + return nil +} + +func TestRingLogger(t *testing.T) { + mockLog := &mockLogger{make(chan *Message)} // no buffer on this channel + ring := newRingLogger(mockLog, Info{}, 1) + defer ring.setClosed() + + // this should never block + ring.Log(&Message{Line: []byte("1")}) + ring.Log(&Message{Line: []byte("2")}) + ring.Log(&Message{Line: []byte("3")}) + + select { + case msg := <-mockLog.c: + if string(msg.Line) != "1" { + t.Fatalf("got unexpected msg: %q", string(msg.Line)) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout reading log message") + } + + select { + case msg := <-mockLog.c: + t.Fatalf("expected no more messages in the queue, got: %q", string(msg.Line)) + default: + } +} + +func TestRingCap(t *testing.T) { + r := newRing(5) + for i := 0; i < 10; i++ { + // queue messages with "0" to "10" + // the "5" to "10" messages should be dropped since we only allow 5 bytes in the buffer + if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil { + t.Fatal(err) + } + } + + // should have messages in the queue for "5" to "10" + for i := 0; i < 5; i++ { + m, err := r.Dequeue() + if err != nil { + t.Fatal(err) + } + if string(m.Line) != strconv.Itoa(i) { + t.Fatalf("got unexpected message for iter %d: %s", i, string(m.Line)) + } + } + + // queue a message that's bigger than the buffer cap + if err := r.Enqueue(&Message{Line: []byte("hello world")}); err != nil { + t.Fatal(err) + } + + // queue another message that's bigger than the buffer cap + if err := r.Enqueue(&Message{Line: []byte("eat a banana")}); err != nil { + t.Fatal(err) + } + + m, err := r.Dequeue() + if err != nil { + t.Fatal(err) + } + if string(m.Line) != "hello world" { + t.Fatalf("got unexpected message: %s", string(m.Line)) + } + if len(r.queue) != 0 { + t.Fatalf("expected queue to be empty, got: %d", len(r.queue)) + } +} + +func TestRingClose(t *testing.T) { + r := newRing(1) + if err := r.Enqueue(&Message{Line: []byte("hello")}); err != nil { + t.Fatal(err) + } + r.Close() + if err := r.Enqueue(&Message{}); err != errClosed { + t.Fatalf("expected errClosed, got: %v", err) + } + if len(r.queue) != 1 { + t.Fatal("expected empty queue") + } + if m, err := r.Dequeue(); err == nil || m != nil { + t.Fatal("exepcted err on Dequeue after close") + } + + ls := r.Drain() + if len(ls) != 1 { + t.Fatalf("expected one message: %v", ls) + } + if string(ls[0].Line) != "hello" { + t.Fatalf("got unexpected message: %s", string(ls[0].Line)) + } +} + +func TestRingDrain(t *testing.T) { + r := newRing(5) + for i := 0; i < 5; i++ { + if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil { + t.Fatal(err) + } + } + + ls := r.Drain() + if len(ls) != 5 { + t.Fatal("got unexpected length after drain") + } + + for i := 0; i < 5; i++ { + if string(ls[i].Line) != strconv.Itoa(i) { + t.Fatalf("got unexpected message at position %d: %s", i, string(ls[i].Line)) + } + } + if r.sizeBytes != 0 { + t.Fatalf("expected buffer size to be 0 after drain, got: %d", r.sizeBytes) + } + + ls = r.Drain() + if len(ls) != 0 { + t.Fatalf("expected 0 messages on 2nd drain: %v", ls) + } + +} + +type nopLogger struct{} + +func (nopLogger) Name() string { return "nopLogger" } +func (nopLogger) Close() error { return nil } +func (nopLogger) Log(*Message) error { return nil } + +func BenchmarkRingLoggerThroughputNoReceiver(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputWithReceiverDelay0(b *testing.B) { + l := NewRingLogger(nopLogger{}, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func consumeWithDelay(delay time.Duration, c <-chan *Message) (cancel func()) { + started := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + close(started) + ticker := time.NewTicker(delay) + for range ticker.C { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-c: + } + } + }() + <-started + return cancel +} + +func BenchmarkRingLoggerThroughputConsumeDelay1(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(1*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay10(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(10*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay50(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(50*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay100(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(100*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay300(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(300*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay500(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(500*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} diff --git a/daemon/logger/splunk/splunk.go b/daemon/logger/splunk/splunk.go index 3ae6da71b3415..5dcd508786a55 100644 --- a/daemon/logger/splunk/splunk.go +++ b/daemon/logger/splunk/splunk.go @@ -336,7 +336,7 @@ func (l *splunkLoggerInline) Log(msg *logger.Message) error { event.Source = msg.Source message.Event = &event - + logger.PutMessage(msg) return l.queueMessageAsync(message) } @@ -354,7 +354,7 @@ func (l *splunkLoggerJSON) Log(msg *logger.Message) error { event.Source = msg.Source message.Event = &event - + logger.PutMessage(msg) return l.queueMessageAsync(message) } @@ -362,7 +362,7 @@ func (l *splunkLoggerRaw) Log(msg *logger.Message) error { message := l.createSplunkMessage(msg) message.Event = string(append(l.prefix, msg.Line...)) - + logger.PutMessage(msg) return l.queueMessageAsync(message) } diff --git a/daemon/logger/splunk/splunk_test.go b/daemon/logger/splunk/splunk_test.go index e7e3d68744bb8..90d50516270ed 100644 --- a/daemon/logger/splunk/splunk_test.go +++ b/daemon/logger/splunk/splunk_test.go @@ -133,11 +133,11 @@ func TestDefault(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notajson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -262,7 +262,7 @@ func TestInlineFormatWithNonDefaultOptions(t *testing.T) { } messageTime := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("1"), "stdout", messageTime, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil { t.Fatal(err) } @@ -361,11 +361,11 @@ func TestJsonFormat(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -478,11 +478,11 @@ func TestRawFormat(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -592,11 +592,11 @@ func TestRawFormatWithLabels(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -705,11 +705,11 @@ func TestRawFormatWithoutTag(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -790,7 +790,7 @@ func TestBatching(t *testing.T) { } for i := 0; i < defaultStreamChannelSize*4; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -856,7 +856,7 @@ func TestFrequency(t *testing.T) { } for i := 0; i < 10; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } time.Sleep(15 * time.Millisecond) @@ -937,7 +937,7 @@ func TestOneMessagePerRequest(t *testing.T) { } for i := 0; i < 10; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1045,7 +1045,7 @@ func TestSkipVerify(t *testing.T) { } for i := 0; i < defaultStreamChannelSize*2; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1057,7 +1057,7 @@ func TestSkipVerify(t *testing.T) { hec.simulateServerError = false for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1127,7 +1127,7 @@ func TestBufferMaximum(t *testing.T) { } for i := 0; i < 11; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1216,7 +1216,7 @@ func TestServerAlwaysDown(t *testing.T) { } for i := 0; i < 5; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1269,7 +1269,7 @@ func TestCannotSendAfterClose(t *testing.T) { t.Fatal(err) } - if err := loggerDriver.Log(&logger.Message{[]byte("message1"), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } @@ -1278,7 +1278,7 @@ func TestCannotSendAfterClose(t *testing.T) { t.Fatal(err) } - if err := loggerDriver.Log(&logger.Message{[]byte("message2"), "stdout", time.Now(), nil, false}); err == nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil { t.Fatal("Driver should not allow to send messages after close") } diff --git a/daemon/logger/syslog/syslog.go b/daemon/logger/syslog/syslog.go index f765b7d9763de..1272b8a647c38 100644 --- a/daemon/logger/syslog/syslog.go +++ b/daemon/logger/syslog/syslog.go @@ -132,10 +132,12 @@ func New(info logger.Info) (logger.Logger, error) { } func (s *syslogger) Log(msg *logger.Message) error { + line := string(msg.Line) + logger.PutMessage(msg) if msg.Source == "stderr" { - return s.writer.Err(string(msg.Line)) + return s.writer.Err(line) } - return s.writer.Info(string(msg.Line)) + return s.writer.Info(line) } func (s *syslogger) Close() error { diff --git a/docs/api/version-history.md b/docs/api/version-history.md index b295eda91c0ac..e8cb046d0ccad 100644 --- a/docs/api/version-history.md +++ b/docs/api/version-history.md @@ -82,6 +82,8 @@ keywords: "API, Docker, rcli, REST, documentation" * `GET /secrets/{id}` returns information on the secret `id`. * `POST /secrets/{id}/update` updates the secret `id`. * `POST /services/(id or name)/update` now accepts service name or prefix of service id as a parameter. +* `POST /containers/create` added 2 built-in log-opts that work on all logging drivers, +`mode` (`blocking`|`non-blocking`), and `max-buffer-size` (e.g. `2m`) which enables a non-blocking log buffer. ## v1.24 API changes