Skip to content

Commit

Permalink
Merge pull request moby#28762 from cpuguy83/logger_ring_buffer
Browse files Browse the repository at this point in the history
Implement optional ring buffer for container logs
  • Loading branch information
LK4D4 committed Feb 1, 2017
2 parents 3138a8f + 3f4fccb commit dc20f2a
Show file tree
Hide file tree
Showing 21 changed files with 668 additions and 86 deletions.
11 changes: 11 additions & 0 deletions api/types/container/host_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,17 @@ func (rp *RestartPolicy) IsSame(tp *RestartPolicy) bool {
return rp.Name == tp.Name && rp.MaximumRetryCount == tp.MaximumRetryCount
}

// LogMode is a type to define the available modes for logging
// These modes affect how logs are handled when log messages start piling up.
type LogMode string

// Available logging modes
const (
LogModeUnset = ""
LogModeBlocking LogMode = "blocking"
LogModeNonBlock LogMode = "non-blocking"
)

// LogConfig represents the logging configuration of the container.
type LogConfig struct {
Type string
Expand Down
21 changes: 19 additions & 2 deletions container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/docker/docker/runconfig"
"github.com/docker/docker/volume"
"github.com/docker/go-connections/nat"
"github.com/docker/go-units"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
Expand Down Expand Up @@ -316,7 +317,7 @@ func (container *Container) CheckpointDir() string {
// StartLogger starts a new logger driver for the container.
func (container *Container) StartLogger() (logger.Logger, error) {
cfg := container.HostConfig.LogConfig
c, err := logger.GetLogDriver(cfg.Type)
initDriver, err := logger.GetLogDriver(cfg.Type)
if err != nil {
return nil, fmt.Errorf("failed to get logging factory: %v", err)
}
Expand All @@ -341,7 +342,23 @@ func (container *Container) StartLogger() (logger.Logger, error) {
return nil, err
}
}
return c(info)

l, err := initDriver(info)
if err != nil {
return nil, err
}

if containertypes.LogMode(cfg.Config["mode"]) == containertypes.LogModeNonBlock {
bufferSize := int64(-1)
if s, exists := cfg.Config["max-buffer-size"]; exists {
bufferSize, err = units.RAMInBytes(s)
if err != nil {
return nil, err
}
}
l = logger.NewRingLogger(l, info, bufferSize)
}
return l, nil
}

// GetProcessLabel returns the process label for the container.
Expand Down
4 changes: 2 additions & 2 deletions daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ func (l *logStream) Log(msg *logger.Message) error {
l.lock.RLock()
defer l.lock.RUnlock()
if !l.closed {
// buffer up the data, making sure to copy the Line data
l.messages <- logger.CopyMessage(msg)
l.messages <- msg
}
return nil
}
Expand Down Expand Up @@ -347,6 +346,7 @@ func (l *logStream) collectBatch() {
})
bytes += (lineBytes + perEventBytes)
}
logger.PutMessage(msg)
}
}
}
Expand Down
16 changes: 10 additions & 6 deletions daemon/logger/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (c *Copier) copySrc(name string, src io.Reader) {
buf := make([]byte, bufSize)
n := 0
eof := false
msg := &Message{Source: name}

for {
select {
Expand Down Expand Up @@ -77,14 +76,16 @@ func (c *Copier) copySrc(name string, src io.Reader) {
}
// Break up the data that we've buffered up into lines, and log each in turn.
p := 0
for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) {
msg.Line = buf[p : p+q]
msg.Timestamp = time.Now().UTC()
msg.Partial = false
for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
select {
case <-c.closed:
return
default:
msg := NewMessage()
msg.Source = name
msg.Timestamp = time.Now().UTC()
msg.Line = append(msg.Line, buf[p:p+q]...)

if logErr := c.dst.Log(msg); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
Expand All @@ -96,9 +97,12 @@ func (c *Copier) copySrc(name string, src io.Reader) {
// noting that it's a partial log line.
if eof || (p == 0 && n == len(buf)) {
if p < n {
msg.Line = buf[p:n]
msg := NewMessage()
msg.Source = name
msg.Timestamp = time.Now().UTC()
msg.Line = append(msg.Line, buf[p:n]...)
msg.Partial = true

if logErr := c.dst.Log(msg); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/logger/copier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestCopierSlow(t *testing.T) {
type BenchmarkLoggerDummy struct {
}

func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil }
func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil }

func (l *BenchmarkLoggerDummy) Close() error { return nil }

Expand Down
4 changes: 3 additions & 1 deletion daemon/logger/etwlogs/etwlogs_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ func (etwLogger *etwLogs) Log(msg *logger.Message) error {
logrus.Error(errorMessage)
return errors.New(errorMessage)
}
return callEventWriteString(createLogMessage(etwLogger, msg))
m := createLogMessage(etwLogger, msg)
logger.PutMessage(msg)
return callEventWriteString(m)
}

// Close closes the logger by unregistering the ETW provider.
Expand Down
33 changes: 32 additions & 1 deletion daemon/logger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package logger
import (
"fmt"
"sync"

containertypes "github.com/docker/docker/api/types/container"
units "github.com/docker/go-units"
"github.com/pkg/errors"
)

// Creator builds a logging driver instance with given context.
Expand Down Expand Up @@ -85,20 +89,47 @@ func GetLogDriver(name string) (Creator, error) {
return factory.get(name)
}

var builtInLogOpts = map[string]bool{
"mode": true,
"max-buffer-size": true,
}

// ValidateLogOpts checks the options for the given log driver. The
// options supported are specific to the LogDriver implementation.
func ValidateLogOpts(name string, cfg map[string]string) error {
if name == "none" {
return nil
}

switch containertypes.LogMode(cfg["mode"]) {
case containertypes.LogModeBlocking, containertypes.LogModeNonBlock, containertypes.LogModeUnset:
default:
return fmt.Errorf("logger: logging mode not supported: %s", cfg["mode"])
}

if s, ok := cfg["max-buffer-size"]; ok {
if containertypes.LogMode(cfg["mode"]) != containertypes.LogModeNonBlock {
return fmt.Errorf("logger: max-buffer-size option is only supported with 'mode=%s'", containertypes.LogModeNonBlock)
}
if _, err := units.RAMInBytes(s); err != nil {
return errors.Wrap(err, "error parsing option max-buffer-size")
}
}

if !factory.driverRegistered(name) {
return fmt.Errorf("logger: no log driver named '%s' is registered", name)
}

filteredOpts := make(map[string]string, len(builtInLogOpts))
for k, v := range cfg {
if !builtInLogOpts[k] {
filteredOpts[k] = v
}
}

validator := factory.getLogOptValidator(name)
if validator != nil {
return validator(cfg)
return validator(filteredOpts)
}
return nil
}
5 changes: 4 additions & 1 deletion daemon/logger/fluentd/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,12 @@ func (f *fluentd) Log(msg *logger.Message) error {
for k, v := range f.extra {
data[k] = v
}

ts := msg.Timestamp
logger.PutMessage(msg)
// fluent-logger-golang buffers logs from failures and disconnections,
// and these are transferred again automatically.
return f.writer.PostWithTime(f.tag, msg.Timestamp, data)
return f.writer.PostWithTime(f.tag, ts, data)
}

func (f *fluentd) Close() error {
Expand Down
8 changes: 6 additions & 2 deletions daemon/logger/gcplogs/gcplogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,16 @@ func ValidateLogOpts(cfg map[string]string) error {
}

func (l *gcplogs) Log(m *logger.Message) error {
data := string(m.Line)
ts := m.Timestamp
logger.PutMessage(m)

l.logger.Log(logging.Entry{
Timestamp: m.Timestamp,
Timestamp: ts,
Payload: &dockerLogEntry{
Instance: l.instance,
Container: l.container,
Data: string(m.Line),
Data: data,
},
})
return nil
Expand Down
1 change: 1 addition & 0 deletions daemon/logger/gelf/gelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error {
Level: level,
RawExtra: s.rawExtra,
}
logger.PutMessage(msg)

if err := s.writer.WriteMessage(&m); err != nil {
return fmt.Errorf("gelf: cannot send GELF message: %v", err)
Expand Down
8 changes: 6 additions & 2 deletions daemon/logger/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,14 @@ func (s *journald) Log(msg *logger.Message) error {
if msg.Partial {
vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
}

line := string(msg.Line)
logger.PutMessage(msg)

if msg.Source == "stderr" {
return journal.Send(string(msg.Line), journal.PriErr, vars)
return journal.Send(line, journal.PriErr, vars)
}
return journal.Send(string(msg.Line), journal.PriInfo, vars)
return journal.Send(line, journal.PriInfo, vars)
}

func (s *journald) Name() string {
Expand Down
1 change: 1 addition & 0 deletions daemon/logger/jsonfilelog/jsonfilelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
Created: timestamp,
RawAttrs: l.extra,
}).MarshalJSONBuf(l.buf)
logger.PutMessage(msg)
if err != nil {
l.mu.Unlock()
return err
Expand Down
4 changes: 3 additions & 1 deletion daemon/logger/logentries/logentries.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (f *logentries) Log(msg *logger.Message) error {
for k, v := range f.extra {
data[k] = v
}
f.writer.Println(f.tag, msg.Timestamp, data)
ts := msg.Timestamp
logger.PutMessage(msg)
f.writer.Println(f.tag, ts, data)
return nil
}

Expand Down
39 changes: 23 additions & 16 deletions daemon/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,24 @@ const (
logWatcherBufferSize = 4096
)

var messagePool = &sync.Pool{New: func() interface{} { return &Message{Line: make([]byte, 0, 256)} }}

// NewMessage returns a new message from the message sync.Pool
func NewMessage() *Message {
return messagePool.Get().(*Message)
}

// PutMessage puts the specified message back n the message pool.
// The message fields are reset before putting into the pool.
func PutMessage(msg *Message) {
msg.reset()
messagePool.Put(msg)
}

// Message is datastructure that represents piece of output produced by some
// container. The Line member is a slice of an array whose contents can be
// changed after a log driver's Log() method returns.
// Any changes made to this struct must also be updated in the `reset` function
type Message struct {
Line []byte
Source string
Expand All @@ -37,22 +52,14 @@ type Message struct {
Partial bool
}

// CopyMessage creates a copy of the passed-in Message which will remain
// unchanged if the original is changed. Log drivers which buffer Messages
// rather than dispatching them during their Log() method should use this
// function to obtain a Message whose Line member's contents won't change.
func CopyMessage(msg *Message) *Message {
m := new(Message)
m.Line = make([]byte, len(msg.Line))
copy(m.Line, msg.Line)
m.Source = msg.Source
m.Timestamp = msg.Timestamp
m.Partial = msg.Partial
m.Attrs = make(LogAttributes)
for k, v := range msg.Attrs {
m.Attrs[k] = v
}
return m
// reset sets the message back to default values
// This is used when putting a message back into the message pool.
// Any changes to the `Message` struct should be reflected here.
func (m *Message) reset() {
m.Line = m.Line[:0]
m.Source = ""
m.Attrs = nil
m.Partial = false
}

// LogAttributes is used to hold the extra attributes available in the log message
Expand Down
26 changes: 0 additions & 26 deletions daemon/logger/logger_test.go

This file was deleted.

Loading

0 comments on commit dc20f2a

Please sign in to comment.