Skip to content

Commit

Permalink
fix flaky tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Sep 12, 2024
1 parent 434d7c3 commit f7664d2
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 326 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.20', '1.23' ]
go: [ '1.22', '1.23' ]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
Expand All @@ -29,11 +29,11 @@ jobs:
name: integration-testscontainers
strategy:
matrix:
go: [ '1.20', '1.23' ]
go: [ '1.22', '1.23' ]
cassandra_version: [ '4.0.8', '4.1.1' ]
auth: [ "false" ]
compressor: [ "snappy" ]
tags: [ "cassandra", "integration"]
tags: [ "cassandra", "integration", "tc"]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
Expand All @@ -45,15 +45,15 @@ jobs:
echo "args=$args" >> $GITHUB_ENV
- name: run
run: |
go test -v -tags "${{ matrix.tags }} gocql_debug" -timeout=5m -race ${{ env.args }}
go test -v -tags "${{ matrix.tags }} gocql_debug" -timeout=10m -race ${{ env.args }}
integration-auth-testscontainers:
needs: build
runs-on: ubuntu-latest
name: integration-auth-testscontainers
strategy:
matrix:
go: [ '1.20', '1.23' ]
go: [ '1.22', '1.23' ]
cassandra_version: [ '4.0.8', '4.1.1' ]
compressor: [ "snappy" ]
steps:
Expand Down
1 change: 0 additions & 1 deletion cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ func TestBatch(t *testing.T) {
}

func TestUnpreparedBatch(t *testing.T) {
t.Skip("FLAKE skipping")
session := createSession(t)
defer session.Close()

Expand Down
73 changes: 26 additions & 47 deletions control_ccm_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build ccm
// +build ccm
//go:build tc
// +build tc

/*
* Licensed to the Apache Software Foundation (ASF) under one
Expand Down Expand Up @@ -28,17 +28,16 @@
package gocql

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/gocql/gocql/internal/ccm"
)

type TestHostFilter struct {
mu sync.Mutex
allowedHosts map[string]ccm.Host
allowedHosts map[string]TChost
}

func (f *TestHostFilter) Accept(h *HostInfo) bool {
Expand All @@ -48,37 +47,27 @@ func (f *TestHostFilter) Accept(h *HostInfo) bool {
return ok
}

func (f *TestHostFilter) SetAllowedHosts(hosts map[string]ccm.Host) {
func (f *TestHostFilter) SetAllowedHosts(hosts map[string]TChost) {
f.mu.Lock()
defer f.mu.Unlock()
f.allowedHosts = hosts
}

func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
if err := ccm.AllUp(); err != nil {
t.Fatal(err)
}

allCcmHosts, err := ccm.Status()
if err != nil {
t.Fatal(err)
}
ctx := context.Background()

if len(allCcmHosts) < 2 {
if len(cassNodes) < 2 {
t.Skip("this test requires at least 2 nodes")
}

allAllowedHosts := map[string]ccm.Host{}
var firstNode *ccm.Host
for _, node := range allCcmHosts {
if firstNode == nil {
firstNode = &node
}
allAllowedHosts := map[string]TChost{}
for _, node := range cassNodes {
allAllowedHosts[node.Addr] = node
}

allowedHosts := map[string]ccm.Host{
firstNode.Addr: *firstNode,
firstNode := cassNodes["node1"]
allowedHosts := map[string]TChost{
firstNode.Addr: firstNode,
}

testFilter := &TestHostFilter{allowedHosts: allowedHosts}
Expand All @@ -99,9 +88,9 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
ccHost := controlConnection.host

var ccHostName string
for _, node := range allCcmHosts {
for name, node := range cassNodes {
if node.Addr == ccHost.ConnectAddress().String() {
ccHostName = node.Name
ccHostName = name
break
}
}
Expand All @@ -110,25 +99,15 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
t.Fatal("could not find name of control host")
}

if err := ccm.NodeDown(ccHostName); err != nil {
if err := cassNodes[ccHostName].TC.Stop(ctx, nil); err != nil {
t.Fatal()
}

defer func() {
ccmStatus, err := ccm.Status()
if err != nil {
t.Logf("could not bring nodes back up after test: %v", err)
return
}
for _, node := range ccmStatus {
if node.State == ccm.NodeStateDown {
err = ccm.NodeUp(node.Name)
if err != nil {
t.Logf("could not bring node %v back up after test: %v", node.Name, err)
}
}
defer func(ctx context.Context) {
if err := restoreCluster(ctx); err != nil {
t.Fatalf("couldn't restore a cluster : %v", err)
}
}()
}(ctx)

assertNodeDown := func() error {
hosts := session.ring.currentHosts()
Expand Down Expand Up @@ -159,19 +138,19 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
}

if assertErr != nil {
t.Fatal(err)
t.Fatal(assertErr)
}

testFilter.SetAllowedHosts(allAllowedHosts)

if err = ccm.NodeUp(ccHostName); err != nil {
if err := restoreCluster(ctx); err != nil {
t.Fatal(err)
}

assertNodeUp := func() error {
hosts := session.ring.currentHosts()
if len(hosts) != len(allCcmHosts) {
return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts))
if len(hosts) != len(cassNodes) {
return fmt.Errorf("expected %v hosts in ring but there were %v", len(ccHostName), len(hosts))
}
for _, host := range hosts {
if !host.IsUp() {
Expand All @@ -181,8 +160,8 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
session.pool.mu.RLock()
poolsLen := len(session.pool.hostConnPools)
session.pool.mu.RUnlock()
if poolsLen != len(allCcmHosts) {
return fmt.Errorf("expected %v connection pool but there were %v", len(allCcmHosts), poolsLen)
if poolsLen != len(cassNodes) {
return fmt.Errorf("expected %v connection pool but there were %v", len(ccHostName), poolsLen)
}
return nil
}
Expand All @@ -196,6 +175,6 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
}

if assertErr != nil {
t.Fatal(err)
t.Fatal(assertErr)
}
}
Loading

0 comments on commit f7664d2

Please sign in to comment.