Skip to content

Commit

Permalink
Mitigate "gocql: no hosts available in the pool" cannot recover witho…
Browse files Browse the repository at this point in the history
  • Loading branch information
mfamador committed Mar 23, 2023
1 parent 33d6551 commit 0b98374
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions internal/impl/cassandra/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type cassandraWriter struct {
backoffMax time.Duration

session *gocql.Session
conn *gocql.ClusterConfig
connLock sync.RWMutex

argsMapping *mapping.Executor
Expand Down Expand Up @@ -192,6 +193,21 @@ func (c *cassandraWriter) parseArgs(mgr bundle.NewManagement) error {
return nil
}

var (
sessLock sync.Mutex
reopenSession bool
)

type observer struct{}

func (observer) ObserveConnect(info gocql.ObservedConnect) {
if info.Err != nil {
sessLock.Lock()
reopenSession = true
sessLock.Unlock()
}
}

func (c *cassandraWriter) Connect(ctx context.Context) error {
c.connLock.Lock()
defer c.connLock.Unlock()
Expand All @@ -214,10 +230,12 @@ func (c *cassandraWriter) Connect(ctx context.Context) error {
Password: c.conf.PasswordAuthenticator.Password,
}
}
conn.ConnectObserver = observer{}
conn.DisableInitialHostLookup = c.conf.DisableInitialHostLookup
if conn.Consistency, err = gocql.ParseConsistencyWrapper(c.conf.Consistency); err != nil {
return fmt.Errorf("parsing consistency: %w", err)
}
c.conn = conn

conn.RetryPolicy = &decorator{
NumRetries: int(c.conf.Config.MaxRetries),
Expand Down Expand Up @@ -249,6 +267,22 @@ func (c *cassandraWriter) WriteBatch(ctx context.Context, msg message.Batch) err
return component.ErrNotConnected
}

sessLock.Lock()
needReOpen := reopenSession
sessLock.Unlock()
if needReOpen {
c.log.Debugln("reopen the cassandra session")
var err error
c.session.Close()
c.session, err = c.conn.CreateSession()
if err != nil {
c.log.Debugf("error reopening session: %w", err)
}
sessLock.Lock()
reopenSession = false
sessLock.Unlock()
}

if msg.Len() == 1 {
return c.writeRow(session, msg)
}
Expand Down

0 comments on commit 0b98374

Please sign in to comment.