Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSGO-9 Add more logging, a Structured Logger interface and log levels #1755

Open
wants to merge 10 commits into
base: trunk
Choose a base branch
from
29 changes: 22 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,17 @@ type ClusterConfig struct {
// If not specified, defaults to the global gocql.Logger.
Logger StdLogger

// StructuredLogger for this ClusterConfig.
// If not specified, Logger will be used instead.
StructuredLogger AdvancedLogger

// LegacyLogLevel for this ClusterConfig, this is only applied for legacy loggers (field Logger).
// This log level is not applied to StructuredLogger because the log level is already a part of the interface
// so the implementation can decide when not to log something.
//
// If not specified, LogLevelWarn will be used as the default.
LegacyLogLevel LogLevel

// internal config for testing
disableControlConn bool
}
Expand Down Expand Up @@ -292,15 +303,19 @@ func NewCluster(hosts ...string) *ClusterConfig {
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
WriteCoalesceWaitTime: 200 * time.Microsecond,
LegacyLogLevel: LogLevelWarn,
}
return cfg
}

func (cfg *ClusterConfig) logger() StdLogger {
func (cfg *ClusterConfig) newLogger() loggerAdapter {
if cfg.StructuredLogger != nil {
return newInternalLoggerFromAdvancedLogger(cfg.StructuredLogger)
}
if cfg.Logger == nil {
return Logger
return newInternalLoggerFromStdLogger(Logger, cfg.LegacyLogLevel)
}
return cfg.Logger
return newInternalLoggerFromStdLogger(cfg.Logger, cfg.LegacyLogLevel)
}

// CreateSession initializes the cluster based on this config and returns a
Expand All @@ -313,14 +328,14 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
// if defined, to translate the given address and port into a possibly new address
// and port, If no AddressTranslator or if an error occurs, the given address and
// port will be returned.
func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, int) {
func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int, logger internalLogger) (net.IP, int) {
if cfg.AddressTranslator == nil || len(addr) == 0 {
return addr, port
}
newAddr, newPort := cfg.AddressTranslator.Translate(addr, port)
if gocqlDebug {
cfg.logger().Printf("gocql: translating address '%v:%d' to '%v:%d'", addr, port, newAddr, newPort)
}
logger.Debug("translating address '%v:%d' to '%v:%d'",
NewLogField("old_addr", addr), NewLogField("old_port", port),
NewLogField("new_addr", newAddr), NewLogField("new_port", newPort))
return newAddr, newPort
}

Expand Down
6 changes: 3 additions & 3 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@ func TestNewCluster_WithHosts(t *testing.T) {
func TestClusterConfig_translateAddressAndPort_NilTranslator(t *testing.T) {
cfg := NewCluster()
assertNil(t, "cluster config address translator", cfg.AddressTranslator)
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234)
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234, nilInternalLogger)
assertTrue(t, "same address as provided", net.ParseIP("10.0.0.1").Equal(newAddr))
assertEqual(t, "translated host and port", 1234, newPort)
}

func TestClusterConfig_translateAddressAndPort_EmptyAddr(t *testing.T) {
cfg := NewCluster()
cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432)
newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0)
newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0, nilInternalLogger)
assertTrue(t, "translated address is still empty", len(newAddr) == 0)
assertEqual(t, "translated port", 0, newPort)
}

func TestClusterConfig_translateAddressAndPort_Success(t *testing.T) {
cfg := NewCluster()
cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432)
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345)
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345, nilInternalLogger)
assertTrue(t, "translated address", net.ParseIP("10.10.10.10").Equal(newAddr))
assertEqual(t, "translated port", 5432, newPort)
}
50 changes: 26 additions & 24 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,30 +140,25 @@ type SslOptions struct {
}

type ConnConfig struct {
ProtoVersion int
CQLVersion string
Timeout time.Duration
WriteTimeout time.Duration
ConnectTimeout time.Duration
Dialer Dialer
HostDialer HostDialer
Compressor Compressor
Authenticator Authenticator
AuthProvider func(h *HostInfo) (Authenticator, error)
Keepalive time.Duration
Logger StdLogger
ProtoVersion int
CQLVersion string
Timeout time.Duration
WriteTimeout time.Duration
ConnectTimeout time.Duration
Dialer Dialer
HostDialer HostDialer
Compressor Compressor
Authenticator Authenticator
AuthProvider func(h *HostInfo) (Authenticator, error)
Keepalive time.Duration
Logger StdLogger
StructuredLogger AdvancedLogger
LegacyLogLevel LogLevel

tlsConfig *tls.Config
disableCoalesce bool
}

func (c *ConnConfig) logger() StdLogger {
if c.Logger == nil {
return Logger
}
return c.Logger
}

type ConnErrorHandler interface {
HandleError(conn *Conn, err error, closed bool)
}
Expand Down Expand Up @@ -225,7 +220,7 @@ type Conn struct {

timeouts int64

logger StdLogger
logger internalLogger
}

// connect establishes a connection to a Cassandra node using session's connection config.
Expand Down Expand Up @@ -289,7 +284,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
},
ctx: ctx,
cancel: cancel,
logger: cfg.logger(),
logger: s.logger,
streamObserver: s.streamObserver,
writeTimeout: writeTimeout,
}
Expand Down Expand Up @@ -726,7 +721,7 @@ func (c *Conn) recv(ctx context.Context) error {
delete(c.calls, head.stream)
c.mu.Unlock()
if call == nil || !ok {
c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
c.logger.Warning("received response for stream which has no handler: header=%v", NewLogField("header", head))
return c.discardFrame(head)
} else if head.stream != call.streamID {
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
Expand Down Expand Up @@ -1165,12 +1160,19 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram
return resp.framer, nil
case <-timeoutCh:
close(call.timeout)
c.logger.Debug("Request timed out on connection %v (%v)",
NewLogField("host_id", c.host.HostID()), NewLogField("addr", c.host.ConnectAddress()))
c.handleTimeout()
return nil, ErrTimeoutNoResponse
case <-ctxDone:
c.logger.Debug("Request failed because context elapsed out on connection %v (%v): %v",
NewLogField("host_id", c.host.HostID()), NewLogField("addr", c.host.ConnectAddress()),
NewLogField("ctx_err", ctx.Err().Error()))
close(call.timeout)
return nil, ctx.Err()
case <-c.ctx.Done():
c.logger.Debug("Request failed because connection closed %v (%v).",
NewLogField("host_id", c.host.HostID()), NewLogField("addr", c.host.ConnectAddress()))
close(call.timeout)
return nil, ErrConnectionClosed
}
Expand Down Expand Up @@ -1470,7 +1472,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
iter := &Iter{framer: framer}
if err := c.awaitSchemaAgreement(ctx); err != nil {
// TODO: should have this behind a flag
c.logger.Println(err)
c.logger.Warning("error while awaiting for schema agreement after a schema change event: %v", NewLogField("err", err.Error()))
}
// dont return an error from this, might be a good idea to give a warning
// though. The impact of this returning an error would be that the cluster
Expand Down Expand Up @@ -1710,7 +1712,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
goto cont
}
if !isValidPeer(host) || host.schemaVersion == "" {
c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
c.logger.Warning("invalid peer or peer with empty schema_version: peer=%s", NewLogField("peer", host.ConnectAddress()))
continue
}

Expand Down
2 changes: 1 addition & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func TestStream0(t *testing.T) {
conn := &Conn{
r: bufio.NewReader(&buf),
streams: streams.New(protoVersion4),
logger: &defaultLogger{},
logger: newInternalLoggerFromStdLogger(&defaultLogger{}, LogLevelNone),
}

err := conn.recv(context.Background())
Expand Down
60 changes: 30 additions & 30 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,20 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
}

return &ConnConfig{
ProtoVersion: cfg.ProtoVersion,
CQLVersion: cfg.CQLVersion,
Timeout: cfg.Timeout,
WriteTimeout: cfg.WriteTimeout,
ConnectTimeout: cfg.ConnectTimeout,
Dialer: cfg.Dialer,
HostDialer: hostDialer,
Compressor: cfg.Compressor,
Authenticator: cfg.Authenticator,
AuthProvider: cfg.AuthProvider,
Keepalive: cfg.SocketKeepalive,
Logger: cfg.logger(),
ProtoVersion: cfg.ProtoVersion,
CQLVersion: cfg.CQLVersion,
Timeout: cfg.Timeout,
WriteTimeout: cfg.WriteTimeout,
ConnectTimeout: cfg.ConnectTimeout,
Dialer: cfg.Dialer,
HostDialer: hostDialer,
Compressor: cfg.Compressor,
Authenticator: cfg.Authenticator,
AuthProvider: cfg.AuthProvider,
Keepalive: cfg.SocketKeepalive,
Logger: cfg.Logger,
StructuredLogger: cfg.StructuredLogger,
LegacyLogLevel: cfg.LegacyLogLevel,
}, nil
}

Expand Down Expand Up @@ -303,7 +305,7 @@ type hostConnPool struct {
filling bool

pos uint32
logger StdLogger
logger internalLogger
}

func (h *hostConnPool) String() string {
Expand Down Expand Up @@ -486,21 +488,20 @@ func (pool *hostConnPool) logConnectErr(err error) {
if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
// connection refused
// these are typical during a node outage so avoid log spam.
if gocqlDebug {
pool.logger.Printf("gocql: unable to dial %q: %v\n", pool.host, err)
}
pool.logger.Debug("unable to dial %s (%s): %v",
NewLogField("host_addr", pool.host.ConnectAddress()), NewLogField("host_id", pool.host.HostID()), NewLogField("err", err.Error()))
} else if err != nil {
// unexpected error
pool.logger.Printf("error: failed to connect to %q due to error: %v", pool.host, err)
pool.logger.Debug("failed to connect to %s (%s) due to error: %v",
NewLogField("host_addr", pool.host.ConnectAddress()), NewLogField("host_id", pool.host.HostID()), NewLogField("err", err.Error()))
}
}

// transition back to a not-filling state.
func (pool *hostConnPool) fillingStopped(err error) {
if err != nil {
if gocqlDebug {
pool.logger.Printf("gocql: filling stopped %q: %v\n", pool.host.ConnectAddress(), err)
}
pool.logger.Warning("connection pool filling failed %s (%s): %v",
NewLogField("host_addr", pool.host.ConnectAddress()), NewLogField("host_id", pool.host.HostID()), NewLogField("err", err.Error()))
// wait for some time to avoid back-to-back filling
// this provides some time between failed attempts
// to fill the pool for the host to recover
Expand All @@ -516,9 +517,8 @@ func (pool *hostConnPool) fillingStopped(err error) {

// if we errored and the size is now zero, make sure the host is marked as down
// see https://github.com/apache/cassandra-gocql-driver/issues/1614
if gocqlDebug {
pool.logger.Printf("gocql: conns of pool after stopped %q: %v\n", host.ConnectAddress(), count)
}
pool.logger.Debug("conns of pool after stopped %s (%s): %v",
NewLogField("host_addr", host.ConnectAddress()), NewLogField("host_id", host.HostID()), NewLogField("count", count))
if err != nil && count == 0 {
if pool.session.cfg.ConvictionPolicy.AddFailure(err, host) {
pool.session.handleNodeDown(host.ConnectAddress(), port)
Expand Down Expand Up @@ -574,10 +574,11 @@ func (pool *hostConnPool) connect() (err error) {
break
}
}
if gocqlDebug {
pool.logger.Printf("gocql: connection failed %q: %v, reconnecting with %T\n",
pool.host.ConnectAddress(), err, reconnectionPolicy)
}
pool.logger.Warning("connection failed %s (%s): %v, reconnecting with %v",
NewLogField("host", pool.host.ConnectAddress()),
NewLogField("host_id", pool.host.HostID()),
NewLogField("err", err.Error()),
NewLogField("reconnectionPolicy", fmt.Sprintf("%T", reconnectionPolicy)))
time.Sleep(reconnectionPolicy.GetInterval(i))
}

Expand Down Expand Up @@ -624,9 +625,8 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
return
}

if gocqlDebug {
pool.logger.Printf("gocql: pool connection error %q: %v\n", conn.addr, err)
}
pool.logger.Info("pool connection error %v: %v",
NewLogField("addr", conn.addr), NewLogField("err", err.Error()))

// find the connection index
for i, candidate := range pool.conns {
Expand Down
Loading
Loading