Skip to content

Commit

Permalink
Make user-supplied sinks operate on URIs (uber-go#606)
Browse files Browse the repository at this point in the history
For future extensibility, make user-supplied factories for log sinks
operate on URIs. Each user-supplied factory owns a scheme, and
double-registering constructors for a scheme is an error. For
back-compat, zap automatically registers a factory for the `file` scheme
and treats URIs without a scheme as though they were for files.
  • Loading branch information
akshayjshah authored Jul 19, 2018
1 parent 7e7e266 commit a01e410
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 122 deletions.
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ type Config struct {
// EncoderConfig sets options for the chosen encoder. See
// zapcore.EncoderConfig for details.
EncoderConfig zapcore.EncoderConfig `json:"encoderConfig" yaml:"encoderConfig"`
// OutputPaths is a list of paths to write logging output to. See Open for
// details.
// OutputPaths is a list of URLs or file paths to write logging output to.
// See Open for details.
OutputPaths []string `json:"outputPaths" yaml:"outputPaths"`
// ErrorOutputPaths is a list of paths to write internal logger errors to.
// ErrorOutputPaths is a list of URLs to write internal logger errors to.
// The default is standard error.
//
// Note that this setting only affects internal errors; for sample code that
Expand Down
125 changes: 96 additions & 29 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ import (
"errors"
"fmt"
"io"
"net/url"
"os"
"strings"
"sync"

"go.uber.org/zap/zapcore"
)

const schemeFile = "file"

var (
_sinkMutex sync.RWMutex
_sinkFactories map[string]func() (Sink, error)
_sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme
)

func init() {
Expand All @@ -42,18 +46,10 @@ func init() {
func resetSinkRegistry() {
_sinkMutex.Lock()
defer _sinkMutex.Unlock()
_sinkFactories = map[string]func() (Sink, error){
"stdout": func() (Sink, error) { return nopCloserSink{os.Stdout}, nil },
"stderr": func() (Sink, error) { return nopCloserSink{os.Stderr}, nil },
}
}

type errSinkNotFound struct {
key string
}

func (e *errSinkNotFound) Error() string {
return fmt.Sprintf("no sink found for %q", e.key)
_sinkFactories = map[string]func(*url.URL) (Sink, error){
schemeFile: newFileSink,
}
}

// Sink defines the interface to write to and close logger destinations.
Expand All @@ -62,33 +58,104 @@ type Sink interface {
io.Closer
}

// RegisterSink adds a Sink at the given key so it can be referenced
// in config OutputPaths.
func RegisterSink(key string, sinkFactory func() (Sink, error)) error {
type nopCloserSink struct{ zapcore.WriteSyncer }

func (nopCloserSink) Close() error { return nil }

type errSinkNotFound struct {
scheme string
}

func (e *errSinkNotFound) Error() string {
return fmt.Sprintf("no sink found for scheme %q", e.scheme)
}

// RegisterSink registers a user-supplied factory for all sinks with a
// particular scheme.
//
// All schemes must be ASCII, valid under section 3.1 of RFC 3986
// (https://tools.ietf.org/html/rfc3986#section-3.1), and must not already
// have a factory registered. Zap automatically registers a factory for the
// "file" scheme.
func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {
_sinkMutex.Lock()
defer _sinkMutex.Unlock()
if key == "" {
return errors.New("sink key cannot be blank")

if scheme == "" {
return errors.New("can't register a sink factory for empty string")
}
normalized, err := normalizeScheme(scheme)
if err != nil {
return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
}
if _, ok := _sinkFactories[key]; ok {
return fmt.Errorf("sink already registered for key %q", key)
if _, ok := _sinkFactories[normalized]; ok {
return fmt.Errorf("sink factory already registered for scheme %q", normalized)
}
_sinkFactories[key] = sinkFactory
_sinkFactories[normalized] = factory
return nil
}

// newSink invokes the registered sink factory to create and return the
// sink for the given key. Returns errSinkNotFound if the key cannot be found.
func newSink(key string) (Sink, error) {
func newSink(rawURL string) (Sink, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)
}
if u.Scheme == "" {
u.Scheme = schemeFile
}

_sinkMutex.RLock()
defer _sinkMutex.RUnlock()
sinkFactory, ok := _sinkFactories[key]
factory, ok := _sinkFactories[u.Scheme]
_sinkMutex.RUnlock()
if !ok {
return nil, &errSinkNotFound{key}
return nil, &errSinkNotFound{u.Scheme}
}
return sinkFactory()
return factory(u)
}

type nopCloserSink struct{ zapcore.WriteSyncer }
func newFileSink(u *url.URL) (Sink, error) {
if u.User != nil {
return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u)
}
if u.Fragment != "" {
return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u)
}
if u.RawQuery != "" {
return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u)
}
// Error messages are better if we check hostname and port separately.
if u.Port() != "" {
return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u)
}
if hn := u.Hostname(); hn != "" && hn != "localhost" {
return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u)
}
switch u.Path {
case "stdout":
return nopCloserSink{os.Stdout}, nil
case "stderr":
return nopCloserSink{os.Stderr}, nil
}
return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
}

func (nopCloserSink) Close() error { return nil }
func normalizeScheme(s string) (string, error) {
// https://tools.ietf.org/html/rfc3986#section-3.1
s = strings.ToLower(s)
if first := s[0]; 'a' > first || 'z' < first {
return "", errors.New("must start with a letter")
}
for i := 1; i < len(s); i++ { // iterate over bytes, not runes
c := s[i]
switch {
case 'a' <= c && c <= 'z':
continue
case '0' <= c && c <= '9':
continue
case c == '.' || c == '+' || c == '-':
continue
}
return "", fmt.Errorf("may not contain %q", c)
}
return s, nil
}
94 changes: 59 additions & 35 deletions sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,80 @@
package zap

import (
"errors"
"os"
"bytes"
"io/ioutil"
"net/url"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.uber.org/zap/zapcore"
)

func TestRegisterSink(t *testing.T) {
tests := []struct {
name string
key string
factory func() (Sink, error)
wantError bool
}{
{"valid", "valid", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, false},
{"empty", "", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, true},
{"stdout", "stdout", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, true},
}
const (
memScheme = "m"
nopScheme = "no-op.1234"
)
var memCalls, nopCalls int

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := RegisterSink(tt.key, tt.factory)
if tt.wantError {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
assert.NotNil(t, _sinkFactories[tt.key], "expected the factory to be present")
}
})
buf := bytes.NewBuffer(nil)
memFactory := func(u *url.URL) (Sink, error) {
assert.Equal(t, u.Scheme, memScheme, "Scheme didn't match registration.")
memCalls++
return nopCloserSink{zapcore.AddSync(buf)}, nil
}
nopFactory := func(u *url.URL) (Sink, error) {
assert.Equal(t, u.Scheme, nopScheme, "Scheme didn't match registration.")
nopCalls++
return nopCloserSink{zapcore.AddSync(ioutil.Discard)}, nil
}
}

func TestNewSink(t *testing.T) {
defer resetSinkRegistry()
errTestSink := errors.New("test erroring")
err := RegisterSink("errors", func() (Sink, error) { return nil, errTestSink })
assert.Nil(t, err)

require.NoError(t, RegisterSink(strings.ToUpper(memScheme), memFactory), "Failed to register scheme %q.", memScheme)
require.NoError(t, RegisterSink(nopScheme, nopFactory), "Failed to register scheme %q.", memScheme)

sink, close, err := Open(
memScheme+"://somewhere",
nopScheme+"://somewhere-else",
)
assert.NoError(t, err, "Unexpected error opening URLs with registered schemes.")

defer close()

assert.Equal(t, 1, memCalls, "Unexpected number of calls to memory factory.")
assert.Equal(t, 1, nopCalls, "Unexpected number of calls to no-op factory.")

_, err = sink.Write([]byte("foo"))
assert.NoError(t, err, "Failed to write to combined WriteSyncer.")
assert.Equal(t, "foo", buf.String(), "Unexpected buffer contents.")
}

func TestRegisterSinkErrors(t *testing.T) {
nopFactory := func(_ *url.URL) (Sink, error) {
return nopCloserSink{zapcore.AddSync(ioutil.Discard)}, nil
}
tests := []struct {
key string
err error
scheme string
err string
}{
{"stdout", nil},
{"errors", errTestSink},
{"nonexistent", &errSinkNotFound{"nonexistent"}},
{"", "empty string"},
{"FILE", "already registered"},
{"42", "not a valid scheme"},
{"http*", "not a valid scheme"},
}

for _, tt := range tests {
t.Run(tt.key, func(t *testing.T) {
_, err := newSink(tt.key)
assert.Equal(t, tt.err, err)
t.Run("scheme-"+tt.scheme, func(t *testing.T) {
defer resetSinkRegistry()

err := RegisterSink(tt.scheme, nopFactory)
if assert.Error(t, err, "expected error") {
assert.Contains(t, err.Error(), tt.err, "unexpected error")
}
})
}
}
45 changes: 22 additions & 23 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@
package zap

import (
"fmt"
"io"
"io/ioutil"
"os"

"go.uber.org/zap/zapcore"

"go.uber.org/multierr"
)

// Open is a high-level wrapper that takes a variadic number of paths, opens or
// creates each of the specified files, and combines them into a locked
// Open is a high-level wrapper that takes a variadic number of URLs, opens or
// creates each of the specified resources, and combines them into a locked
// WriteSyncer. It also returns any error encountered and a function to close
// any opened files.
//
// Passing no paths returns a no-op WriteSyncer. The special paths "stdout" and
// "stderr" are interpreted as os.Stdout and os.Stderr, respectively.
// Passing no URLs returns a no-op WriteSyncer. Zap handles URLs without a
// scheme and URLs with the "file" scheme. Third-party code may register
// factories for other schemes using RegisterSink.
//
// URLs with the "file" scheme must use absolute paths on the local
// filesystem. No user, password, port, fragments, or query parameters are
// allowed, and the hostname must be empty or "localhost".
//
// Since it's common to write logs to the local filesystem, URLs without a
// scheme (e.g., "/var/log/foo.log") are treated as local file paths. Without
// a scheme, the special paths "stdout" and "stderr" are interpreted as
// os.Stdout and os.Stderr. When specified without a scheme, relative file
// paths also work.
func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
writers, close, err := open(paths)
if err != nil {
Expand All @@ -48,36 +59,24 @@ func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
}

func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {
var openErr error
writers := make([]zapcore.WriteSyncer, 0, len(paths))
closers := make([]io.Closer, 0, len(paths))
close := func() {
for _, c := range closers {
c.Close()
}
}

var openErr error
for _, path := range paths {
sink, err := newSink(path)
if err == nil {
// Using a registered sink constructor.
writers = append(writers, sink)
closers = append(closers, sink)
continue
}
if _, ok := err.(*errSinkNotFound); ok {
// No named sink constructor, use key as path to log file.
f, e := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
openErr = multierr.Append(openErr, e)
if e == nil {
writers = append(writers, f)
closers = append(closers, f)
}
if err != nil {
openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
continue
}
// Sink constructor failed.
openErr = multierr.Append(openErr, err)
writers = append(writers, sink)
closers = append(closers, sink)
}

if openErr != nil {
close()
return writers, nil, openErr
Expand Down
Loading

0 comments on commit a01e410

Please sign in to comment.