Skip to content

Commit

Permalink
GODRIVER-2586 Add log messages to CMAP spec (#1165)
Browse files Browse the repository at this point in the history
Co-authored-by: Matt Dale <9760375+matthewdale@users.noreply.github.com>
  • Loading branch information
prestonvasquez and matthewdale authored Feb 27, 2023
1 parent a0bd0e5 commit 038249a
Show file tree
Hide file tree
Showing 21 changed files with 2,134 additions and 287 deletions.
1 change: 1 addition & 0 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type PoolEvent struct {
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
ServiceID *primitive.ObjectID `json:"serviceId"`
Error error `json:"error"`
}

// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
Expand Down
116 changes: 99 additions & 17 deletions internal/logger/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,60 @@ import (
)

const (
CommandFailed = "Command failed"
CommandStarted = "Command started"
CommandSucceeded = "Command succeeded"
CommandFailed = "Command failed"
CommandStarted = "Command started"
CommandSucceeded = "Command succeeded"
ConnectionPoolCreated = "Connection pool created"
ConnectionPoolReady = "Connection pool ready"
ConnectionPoolCleared = "Connection pool cleared"
ConnectionPoolClosed = "Connection pool closed"
ConnectionCreated = "Connection created"
ConnectionReady = "Connection ready"
ConnectionClosed = "Connection closed"
ConnectionCheckoutStarted = "Connection checkout started"
ConnectionCheckoutFailed = "Connection checkout failed"
ConnectionCheckedOut = "Connection checked out"
ConnectionCheckedIn = "Connection checked in"
)

const (
KeyCommand = "command"
KeyCommandName = "commandName"
KeyDatabaseName = "databaseName"
KeyDriverConnectionID = "driverConnectionId"
KeyDurationMS = "durationMS"
KeyError = "error"
KeyFailure = "failure"
KeyMaxConnecting = "maxConnecting"
KeyMaxIdleTimeMS = "maxIdleTimeMS"
KeyMaxPoolSize = "maxPoolSize"
KeyMessage = "message"
KeyMinPoolSize = "minPoolSize"
KeyOperationID = "operationId"
KeyReason = "reason"
KeyReply = "reply"
KeyRequestID = "requestId"
KeyServerConnectionID = "serverConnectionId"
KeyServerHost = "serverHost"
KeyServerPort = "serverPort"
KeyServiceID = "serviceId"
KeyTimestamp = "timestamp"
)

type KeyValues []interface{}

func (kvs *KeyValues) Add(key string, value interface{}) {
*kvs = append(*kvs, key, value)
}

const (
ReasonConnClosedStale = "Connection became stale because the pool was cleared"
ReasonConnClosedIdle = "Connection has been available but unused for longer than the configured max idle time"
ReasonConnClosedError = "An error occurred while using the connection"
ReasonConnClosedPoolClosed = "Connection pool was closed"
ReasonConnCheckoutFailedTimout = "Wait queue timeout elapsed without a connection becoming available"
ReasonConnCheckoutFailedError = "An error occurred while trying to establish a new connection"
ReasonConnCheckoutFailedPoolClosed = "Connection pool was closed"
)

// Component is an enumeration representing the "components" which can be
Expand Down Expand Up @@ -87,31 +138,62 @@ type Command struct {
// structured logging.
func SerializeCommand(cmd Command, extraKeysAndValues ...interface{}) []interface{} {
// Initialize the boilerplate keys and values.
keysAndValues := append([]interface{}{
"commandName", cmd.Name,
"driverConnectionId", cmd.DriverConnectionID,
"message", cmd.Message,
"operationId", cmd.OperationID,
"requestId", cmd.RequestID,
"serverHost", cmd.ServerHost,
}, extraKeysAndValues...)
keysAndValues := KeyValues{
KeyCommandName, cmd.Name,
KeyDriverConnectionID, cmd.DriverConnectionID,
KeyMessage, cmd.Message,
KeyOperationID, cmd.OperationID,
KeyRequestID, cmd.RequestID,
KeyServerHost, cmd.ServerHost,
}

// Add the extra keys and values.
for i := 0; i < len(extraKeysAndValues); i += 2 {
keysAndValues.Add(extraKeysAndValues[i].(string), extraKeysAndValues[i+1])
}

// Add the optional keys and values.
port, err := strconv.ParseInt(cmd.ServerPort, 0, 32)
if err == nil {
keysAndValues = append(keysAndValues, "serverPort", port)
keysAndValues.Add(KeyServerPort, port)
}

// Add the "serverConnectionId" if it is not nil.
if cmd.ServerConnectionID != nil {
keysAndValues = append(keysAndValues,
"serverConnectionId", *cmd.ServerConnectionID)
keysAndValues.Add(KeyServerConnectionID, *cmd.ServerConnectionID)
}

// Add the "serviceId" if it is not nil.
if cmd.ServiceID != nil {
keysAndValues = append(keysAndValues,
"serviceId", cmd.ServiceID.Hex())
keysAndValues.Add(KeyServiceID, cmd.ServiceID.Hex())
}

return keysAndValues
}

// Connection contains data that all connection log messages MUST contain.
type Connection struct {
Message string // Message associated with the connection
ServerHost string // Hostname or IP address for the server
ServerPort string // Port for the server
}

// SerializeConnection serializes a ConnectionMessage into a slice of keys
// and values that can be passed to a logger.
func SerializeConnection(conn Connection, extraKeysAndValues ...interface{}) []interface{} {
// Initialize the boilerplate keys and values.
keysAndValues := KeyValues{
KeyMessage, conn.Message,
KeyServerHost, conn.ServerHost,
}

// Add the optional keys and values.
for i := 0; i < len(extraKeysAndValues); i += 2 {
keysAndValues.Add(extraKeysAndValues[i].(string), extraKeysAndValues[i+1])
}

port, err := strconv.ParseInt(conn.ServerPort, 0, 32)
if err == nil {
keysAndValues.Add(KeyServerPort, port)
}

return keysAndValues
Expand Down
93 changes: 27 additions & 66 deletions internal/logger/io_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,91 +7,52 @@
package logger

import (
"encoding/json"
"io"
"log"
"sync"
"time"
)

// IOSink writes to an io.Writer using the standard library logging solution and
// is the default sink for the logger, with the default IO being os.Stderr.
// IOSink writes a JSON-encoded message to the io.Writer.
type IOSink struct {
log *log.Logger
enc *json.Encoder

// encMu protects the encoder from concurrent writes. While the logger
// itself does not concurrently write to the sink, the sink may be used
// concurrently within the driver.
encMu sync.Mutex
}

// Compile-time check to ensure osSink implements the LogSink interface.
// Compile-time check to ensure IOSink implements the LogSink interface.
var _ LogSink = &IOSink{}

// NewIOSink will create a new IOSink that writes to the provided io.Writer.
// NewIOSink will create an IOSink object that writes JSON messages to the
// provided io.Writer.
func NewIOSink(out io.Writer) *IOSink {
return &IOSink{
log: log.New(out, "", log.LstdFlags),
enc: json.NewEncoder(out),
}
}

func logCommandMessageStarted(log *log.Logger, kvMap map[string]interface{}) {
format := "Command %q started on database %q using a connection with " +
"server-generated ID %d to %s:%d. The requestID is %d and " +
"the operation ID is %d. Command: %s"

log.Printf(format,
kvMap["commandName"],
kvMap["databaseName"],
kvMap["serverConnectionId"],
kvMap["serverHost"],
kvMap["serverPort"],
kvMap["requestId"],
kvMap["operationId"],
kvMap["command"])

}

func logCommandMessageSucceeded(log *log.Logger, kvMap map[string]interface{}) {
format := "Command %q succeeded in %d ms using server-generated ID " +
"%d to %s:%d. The requestID is %d and the operation ID is " +
"%d. Command reply: %s"
// Info will write a JSON-encoded message to the io.Writer.
func (sink *IOSink) Info(_ int, msg string, keysAndValues ...interface{}) {
kvMap := make(map[string]interface{}, len(keysAndValues)/2+2)

log.Printf(format,
kvMap["commandName"],
kvMap["duration"],
kvMap["serverConnectionId"],
kvMap["serverHost"],
kvMap["serverPort"],
kvMap["requestId"],
kvMap["operationId"],
kvMap["reply"])
}
kvMap[KeyTimestamp] = time.Now().UnixNano()
kvMap[KeyMessage] = msg

func logCommandMessageFailed(log *log.Logger, kvMap map[string]interface{}) {
format := "Command %q failed in %d ms using a connection with " +
"server-generated ID %d to %s:%d. The requestID is %d and " +
"the operation ID is %d. Error: %s"

log.Printf(format,
kvMap["commandName"],
kvMap["duration"],
kvMap["serverConnectionID"],
kvMap["serverHost"],
kvMap["serverPort"],
kvMap["requestId"],
kvMap["operationId"],
kvMap["failure"])
}

func (osSink *IOSink) Info(_ int, msg string, keysAndValues ...interface{}) {
kvMap := make(map[string]interface{})
for i := 0; i < len(keysAndValues); i += 2 {
kvMap[keysAndValues[i].(string)] = keysAndValues[i+1]
}

switch msg {
case CommandStarted:
logCommandMessageStarted(osSink.log, kvMap)
case CommandSucceeded:
logCommandMessageSucceeded(osSink.log, kvMap)
case CommandFailed:
logCommandMessageFailed(osSink.log, kvMap)
}
sink.encMu.Lock()
defer sink.encMu.Unlock()

_ = sink.enc.Encode(kvMap)
}

func (osSink *IOSink) Error(err error, msg string, kv ...interface{}) {
osSink.Info(0, msg, kv...)
// Error will write a JSON-encoded error message tot he io.Writer.
func (sink *IOSink) Error(err error, msg string, kv ...interface{}) {
kv = append(kv, KeyError, err.Error())
sink.Info(0, msg, kv...)
}
79 changes: 77 additions & 2 deletions internal/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
package logger

import (
"bytes"
"encoding/json"
"fmt"
"os"
"reflect"
"sync"
"testing"
)

Expand All @@ -33,12 +37,83 @@ func BenchmarkLogger(b *testing.B) {
b.Fatal(err)
}

for i := 0; i < b.N; i++ {
logger.Print(LevelInfo, ComponentCommand, "foo", "bar", "baz")
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
logger.Print(LevelInfo, ComponentCommand, "foo", "bar", "baz")
}
})
})
}

func mockKeyValues(length int) (KeyValues, map[string]interface{}) {
keysAndValues := KeyValues{}
m := map[string]interface{}{}

for i := 0; i < length; i++ {
keyName := fmt.Sprintf("key%d", i)
valueName := fmt.Sprintf("value%d", i)

keysAndValues.Add(keyName, valueName)
m[keyName] = valueName
}

return keysAndValues, m
}

func BenchmarkIOSinkInfo(b *testing.B) {
keysAndValues, _ := mockKeyValues(10)

b.ReportAllocs()
b.ResetTimer()

sink := NewIOSink(bytes.NewBuffer(nil))

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sink.Info(0, "foo", keysAndValues...)
}
})
}

func TestIOSinkInfo(t *testing.T) {
t.Parallel()

const threshold = 1000

mockKeyValues, kvmap := mockKeyValues(10)

buf := new(bytes.Buffer)
sink := NewIOSink(buf)

wg := sync.WaitGroup{}
wg.Add(threshold)

for i := 0; i < threshold; i++ {
go func() {
defer wg.Done()

sink.Info(0, "foo", mockKeyValues...)
}()
}

wg.Wait()

dec := json.NewDecoder(buf)
for dec.More() {
var m map[string]interface{}
if err := dec.Decode(&m); err != nil {
t.Fatalf("error unmarshaling JSON: %v", err)
}

delete(m, KeyTimestamp)
delete(m, KeyMessage)

if !reflect.DeepEqual(m, kvmap) {
t.Fatalf("expected %v, got %v", kvmap, m)
}
}
}

func TestSelectMaxDocumentLength(t *testing.T) {
t.Parallel()

Expand Down
Loading

0 comments on commit 038249a

Please sign in to comment.