diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2d82e2576..ae66bfbe5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: [ '1.20', '1.23' ] + go: [ '1.22', '1.23' ] steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 @@ -29,11 +29,11 @@ jobs: name: integration-testscontainers strategy: matrix: - go: [ '1.20', '1.23' ] + go: [ '1.22', '1.23' ] cassandra_version: [ '4.0.8', '4.1.1' ] auth: [ "false" ] compressor: [ "snappy" ] - tags: [ "cassandra", "integration"] + tags: [ "cassandra", "integration", "tc"] steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 @@ -45,7 +45,7 @@ jobs: echo "args=$args" >> $GITHUB_ENV - name: run run: | - go test -v -tags "${{ matrix.tags }} gocql_debug" -timeout=5m -race ${{ env.args }} + go test -v -tags "${{ matrix.tags }} gocql_debug" -timeout=10m -race ${{ env.args }} integration-auth-testscontainers: needs: build @@ -53,7 +53,7 @@ jobs: name: integration-auth-testscontainers strategy: matrix: - go: [ '1.20', '1.23' ] + go: [ '1.22', '1.23' ] cassandra_version: [ '4.0.8', '4.1.1' ] compressor: [ "snappy" ] steps: diff --git a/cassandra_test.go b/cassandra_test.go index 9401eb1e6..9a4fe0609 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -628,7 +628,6 @@ func TestBatch(t *testing.T) { } func TestUnpreparedBatch(t *testing.T) { - t.Skip("FLAKE skipping") session := createSession(t) defer session.Close() diff --git a/control_ccm_test.go b/control_ccm_test.go index 426a59aef..48f5362e5 100644 --- a/control_ccm_test.go +++ b/control_ccm_test.go @@ -1,5 +1,5 @@ -//go:build ccm -// +build ccm +//go:build tc +// +build tc /* * Licensed to the Apache Software Foundation (ASF) under one @@ -28,17 +28,16 @@ package gocql import ( + "context" "fmt" "sync" "testing" "time" - - "github.com/gocql/gocql/internal/ccm" ) type TestHostFilter struct { mu sync.Mutex - allowedHosts map[string]ccm.Host + allowedHosts map[string]TChost } func (f *TestHostFilter) Accept(h *HostInfo) bool { @@ -48,37 +47,27 @@ func (f *TestHostFilter) Accept(h *HostInfo) bool { return ok } -func (f *TestHostFilter) SetAllowedHosts(hosts map[string]ccm.Host) { +func (f *TestHostFilter) SetAllowedHosts(hosts map[string]TChost) { f.mu.Lock() defer f.mu.Unlock() f.allowedHosts = hosts } func TestControlConn_ReconnectRefreshesRing(t *testing.T) { - if err := ccm.AllUp(); err != nil { - t.Fatal(err) - } - - allCcmHosts, err := ccm.Status() - if err != nil { - t.Fatal(err) - } + ctx := context.Background() - if len(allCcmHosts) < 2 { + if len(cassNodes) < 2 { t.Skip("this test requires at least 2 nodes") } - allAllowedHosts := map[string]ccm.Host{} - var firstNode *ccm.Host - for _, node := range allCcmHosts { - if firstNode == nil { - firstNode = &node - } + allAllowedHosts := map[string]TChost{} + for _, node := range cassNodes { allAllowedHosts[node.Addr] = node } - allowedHosts := map[string]ccm.Host{ - firstNode.Addr: *firstNode, + firstNode := cassNodes["node1"] + allowedHosts := map[string]TChost{ + firstNode.Addr: firstNode, } testFilter := &TestHostFilter{allowedHosts: allowedHosts} @@ -99,9 +88,9 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { ccHost := controlConnection.host var ccHostName string - for _, node := range allCcmHosts { + for name, node := range cassNodes { if node.Addr == ccHost.ConnectAddress().String() { - ccHostName = node.Name + ccHostName = name break } } @@ -110,25 +99,15 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { t.Fatal("could not find name of control host") } - if err := ccm.NodeDown(ccHostName); err != nil { + if err := cassNodes[ccHostName].TC.Stop(ctx, nil); err != nil { t.Fatal() } - defer func() { - ccmStatus, err := ccm.Status() - if err != nil { - t.Logf("could not bring nodes back up after test: %v", err) - return - } - for _, node := range ccmStatus { - if node.State == ccm.NodeStateDown { - err = ccm.NodeUp(node.Name) - if err != nil { - t.Logf("could not bring node %v back up after test: %v", node.Name, err) - } - } + defer func(ctx context.Context) { + if err := restoreCluster(ctx); err != nil { + t.Fatalf("couldn't restore a cluster : %v", err) } - }() + }(ctx) assertNodeDown := func() error { hosts := session.ring.currentHosts() @@ -159,19 +138,19 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { } if assertErr != nil { - t.Fatal(err) + t.Fatal(assertErr) } testFilter.SetAllowedHosts(allAllowedHosts) - if err = ccm.NodeUp(ccHostName); err != nil { + if err := restoreCluster(ctx); err != nil { t.Fatal(err) } assertNodeUp := func() error { hosts := session.ring.currentHosts() - if len(hosts) != len(allCcmHosts) { - return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts)) + if len(hosts) != len(cassNodes) { + return fmt.Errorf("expected %v hosts in ring but there were %v", len(ccHostName), len(hosts)) } for _, host := range hosts { if !host.IsUp() { @@ -181,8 +160,8 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { session.pool.mu.RLock() poolsLen := len(session.pool.hostConnPools) session.pool.mu.RUnlock() - if poolsLen != len(allCcmHosts) { - return fmt.Errorf("expected %v connection pool but there were %v", len(allCcmHosts), poolsLen) + if poolsLen != len(cassNodes) { + return fmt.Errorf("expected %v connection pool but there were %v", len(ccHostName), poolsLen) } return nil } @@ -196,6 +175,6 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { } if assertErr != nil { - t.Fatal(err) + t.Fatal(assertErr) } } diff --git a/events_ccm_test.go b/events_ccm_test.go index a105985bc..e84168722 100644 --- a/events_ccm_test.go +++ b/events_ccm_test.go @@ -1,5 +1,5 @@ -//go:build (ccm && ignore) || ignore -// +build ccm,ignore ignore +//go:build tc +// +build tc /* * Licensed to the Apache Software Foundation (ASF) under one @@ -28,83 +28,52 @@ package gocql import ( - "log" + "context" "testing" "time" - - "github.com/gocql/gocql/internal/ccm" ) func TestEventDiscovery(t *testing.T) { - t.Skip("FLAKE skipping") - if err := ccm.AllUp(); err != nil { - t.Fatal(err) - } - session := createSession(t) defer session.Close() - status, err := ccm.Status() - if err != nil { - t.Fatal(err) - } - t.Logf("status=%+v\n", status) - - session.pool.mu.RLock() - poolHosts := session.pool.hostConnPools // TODO: replace with session.ring - t.Logf("poolhosts=%+v\n", poolHosts) // check we discovered all the nodes in the ring - for _, host := range status { - if _, ok := poolHosts[host.Addr]; !ok { - t.Errorf("did not discover %q", host.Addr) + for _, node := range cassNodes { + host := session.ring.getHost(node.ID) + if host == nil { + t.Errorf("did not discover %q", node.Addr) + } + if t.Failed() { + t.FailNow() } - } - session.pool.mu.RUnlock() - if t.Failed() { - t.FailNow() } } func TestEventNodeDownControl(t *testing.T) { - t.Skip("FLAKE skipping") + ctx := context.Background() const targetNode = "node1" - if err := ccm.AllUp(); err != nil { - t.Fatal(err) - } - - status, err := ccm.Status() - if err != nil { - t.Fatal(err) - } + node := cassNodes[targetNode] cluster := createCluster() - cluster.Hosts = []string{status[targetNode].Addr} + cluster.Hosts = []string{node.Addr} session := createSessionFromCluster(cluster, t) defer session.Close() - t.Log("marking " + targetNode + " as down") - if err := ccm.NodeDown(targetNode); err != nil { + t.Logf("marking node %q down \n", targetNode) + if err := node.TC.Stop(ctx, nil); err != nil { t.Fatal(err) } + defer func(ctx context.Context) { + if err := restoreCluster(ctx); err != nil { + t.Fatalf("couldn't restore a cluster : %v", err) + } + }(ctx) - t.Logf("status=%+v\n", status) - t.Logf("marking node %q down: %v\n", targetNode, status[targetNode]) - - time.Sleep(5 * time.Second) - - session.pool.mu.RLock() - - poolHosts := session.pool.hostConnPools - node := status[targetNode] - t.Logf("poolhosts=%+v\n", poolHosts) - - if _, ok := poolHosts[node.Addr]; ok { - session.pool.mu.RUnlock() + if _, ok := getPool(session.pool, node.ID); ok { t.Fatal("node not removed after remove event") } - session.pool.mu.RUnlock() - host := session.ring.getHost(node.Addr) + host := session.ring.getHost(node.ID) if host == nil { t.Fatal("node not in metadata ring") } else if host.IsUp() { @@ -113,40 +82,28 @@ func TestEventNodeDownControl(t *testing.T) { } func TestEventNodeDown(t *testing.T) { - t.Skip("FLAKE skipping") + ctx := context.Background() + const targetNode = "node3" - if err := ccm.AllUp(); err != nil { - t.Fatal(err) - } + node := cassNodes[targetNode] session := createSession(t) defer session.Close() - if err := ccm.NodeDown(targetNode); err != nil { + if err := node.TC.Stop(ctx, nil); err != nil { t.Fatal(err) } + defer func(ctx context.Context) { + if err := restoreCluster(ctx); err != nil { + t.Fatalf("couldn't restore a cluster : %v", err) + } + }(ctx) - status, err := ccm.Status() - if err != nil { - t.Fatal(err) - } - t.Logf("status=%+v\n", status) - t.Logf("marking node %q down: %v\n", targetNode, status[targetNode]) - - time.Sleep(5 * time.Second) - - session.pool.mu.RLock() - defer session.pool.mu.RUnlock() - - poolHosts := session.pool.hostConnPools - node := status[targetNode] - t.Logf("poolhosts=%+v\n", poolHosts) - - if _, ok := poolHosts[node.Addr]; ok { - t.Fatal("node not removed after remove event") + if _, ok := getPool(session.pool, node.ID); ok { + t.Errorf("node not removed after remove event") } - host := session.ring.getHost(node.Addr) + host := session.ring.getHost(node.ID) if host == nil { t.Fatal("node not in metadata ring") } else if host.IsUp() { @@ -155,55 +112,39 @@ func TestEventNodeDown(t *testing.T) { } func TestEventNodeUp(t *testing.T) { - t.Skip("FLAKE skipping") - if err := ccm.AllUp(); err != nil { - t.Fatal(err) - } - - status, err := ccm.Status() - if err != nil { - t.Fatal(err) - } - log.Printf("status=%+v\n", status) + ctx := context.Background() session := createSession(t) defer session.Close() const targetNode = "node2" - node := status[targetNode] + node := cassNodes[targetNode] - _, ok := session.pool.getPool(node.Addr) - if !ok { - session.pool.mu.RLock() - t.Errorf("target pool not in connection pool: addr=%q pools=%v", status[targetNode].Addr, session.pool.hostConnPools) - session.pool.mu.RUnlock() + if _, ok := getPool(session.pool, node.ID); !ok { + t.Errorf("target pool not in connection pool: addr=%q pools=%v", node.Addr, session.pool.hostConnPools) t.FailNow() } - if err := ccm.NodeDown(targetNode); err != nil { + if err := node.TC.Stop(ctx, nil); err != nil { t.Fatal(err) } - time.Sleep(5 * time.Second) - - _, ok = session.pool.getPool(node.Addr) - if ok { + if _, ok := getPool(session.pool, node.ID); ok { t.Fatal("node not removed after remove event") } - if err := ccm.NodeUp(targetNode); err != nil { - t.Fatal(err) + if err := restoreCluster(ctx); err != nil { + t.Fatalf("couldn't restore a cluster : %v", err) } // cassandra < 2.2 needs 10 seconds to start up the binary service time.Sleep(15 * time.Second) - _, ok = session.pool.getPool(node.Addr) - if !ok { + if _, ok := getPool(session.pool, node.ID); !ok { t.Fatal("node not added after node added event") } - host := session.ring.getHost(node.Addr) + host := session.ring.getHost(node.ID) if host == nil { t.Fatal("node not in metadata ring") } else if !host.IsUp() { @@ -212,29 +153,23 @@ func TestEventNodeUp(t *testing.T) { } func TestEventFilter(t *testing.T) { - t.Skip("FLAKE skipping") - if err := ccm.AllUp(); err != nil { - t.Fatal(err) - } - - status, err := ccm.Status() - if err != nil { - t.Fatal(err) - } - log.Printf("status=%+v\n", status) + ctx := context.Background() cluster := createCluster() - cluster.HostFilter = WhiteListHostFilter(status["node1"].Addr) + + whiteListedNodeName := "node1" + whiteListedNode := cassNodes[whiteListedNodeName] + cluster.HostFilter = WhiteListHostFilter(whiteListedNode.Addr) + session := createSessionFromCluster(cluster, t) defer session.Close() - if _, ok := session.pool.getPool(status["node1"].Addr); !ok { - t.Errorf("should have %v in pool but dont", "node1") + if _, ok := getPool(session.pool, whiteListedNode.ID); !ok { + t.Errorf("should have %v in pool but dont", whiteListedNodeName) } for _, host := range [...]string{"node2", "node3"} { - _, ok := session.pool.getPool(status[host].Addr) - if ok { + if _, ok := getPool(session.pool, cassNodes[host].ID); ok { t.Errorf("should not have %v in pool", host) } } @@ -243,19 +178,18 @@ func TestEventFilter(t *testing.T) { t.FailNow() } - if err := ccm.NodeDown("node2"); err != nil { + shutdownNode := cassNodes["node2"] + + if err := shutdownNode.TC.Stop(ctx, nil); err != nil { t.Fatal(err) } - time.Sleep(5 * time.Second) - - if err := ccm.NodeUp("node2"); err != nil { - t.Fatal(err) + if err := restoreCluster(ctx); err != nil { + t.Fatalf("couldn't restore a cluster : %v", err) } - time.Sleep(15 * time.Second) for _, host := range [...]string{"node2", "node3"} { - _, ok := session.pool.getPool(status[host].Addr) + _, ok := getPool(session.pool, cassNodes[host].ID) if ok { t.Errorf("should not have %v in pool", host) } @@ -264,51 +198,35 @@ func TestEventFilter(t *testing.T) { if t.Failed() { t.FailNow() } - } func TestEventDownQueryable(t *testing.T) { - t.Skip("FLAKE skipping") - if err := ccm.AllUp(); err != nil { - t.Fatal(err) - } + ctx := context.Background() - status, err := ccm.Status() - if err != nil { - t.Fatal(err) - } - log.Printf("status=%+v\n", status) - - const targetNode = "node1" - - addr := status[targetNode].Addr + targetNode := cassNodes["node1"] cluster := createCluster() - cluster.Hosts = []string{addr} - cluster.HostFilter = WhiteListHostFilter(addr) + cluster.Hosts = []string{targetNode.Addr} + cluster.HostFilter = WhiteListHostFilter(targetNode.Addr) session := createSessionFromCluster(cluster, t) defer session.Close() - if pool, ok := session.pool.getPool(addr); !ok { - t.Fatalf("should have %v in pool but dont", addr) + if pool, ok := getPool(session.pool, targetNode.ID); !ok { + t.Fatalf("should have %v in pool but dont", targetNode.Addr) } else if !pool.host.IsUp() { t.Fatalf("host is not up %v", pool.host) } - if err := ccm.NodeDown(targetNode); err != nil { + if err := targetNode.TC.Stop(ctx, nil); err != nil { t.Fatal(err) } - time.Sleep(5 * time.Second) - - if err := ccm.NodeUp(targetNode); err != nil { - t.Fatal(err) + if err := restoreCluster(ctx); err != nil { + t.Fatalf("couldn't preserve a cluster : %v", err) } - time.Sleep(15 * time.Second) - - if pool, ok := session.pool.getPool(addr); !ok { - t.Fatalf("should have %v in pool but dont", addr) + if pool, ok := getPool(session.pool, targetNode.ID); !ok { + t.Fatalf("should have %v in pool but dont", targetNode.Addr) } else if !pool.host.IsUp() { t.Fatalf("host is not up %v", pool.host) } diff --git a/go.mod b/go.mod index f75b3c00a..53ab9193e 100644 --- a/go.mod +++ b/go.mod @@ -17,12 +17,12 @@ // module github.com/gocql/gocql -go 1.21 +go 1.23 require ( github.com/golang/snappy v0.0.4 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed - github.com/testcontainers/testcontainers-go v0.32.0 + github.com/testcontainers/testcontainers-go v0.33.0 gopkg.in/inf.v0 v0.9.1 ) @@ -30,16 +30,13 @@ require ( dario.cat/mergo v1.0.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/Microsoft/hcsshim v0.11.5 // indirect - github.com/bitly/go-hostpool v0.1.0 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/containerd/containerd v1.7.18 // indirect - github.com/containerd/errdefs v0.1.0 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v0.2.1 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/distribution/reference v0.6.0 // indirect - github.com/docker/docker v27.0.3+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -72,9 +69,11 @@ require ( go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect - golang.org/x/crypto v0.23.0 // indirect + golang.org/x/crypto v0.22.0 // indirect golang.org/x/sys v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect - google.golang.org/grpc v1.64.1 // indirect - google.golang.org/protobuf v1.33.0 // indirect +) + +require ( + github.com/bitly/go-hostpool v0.1.0 // indirect + github.com/docker/docker v27.1.1+incompatible ) diff --git a/go.sum b/go.sum index 65a8c701a..9f0781765 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= -github.com/Microsoft/hcsshim v0.11.5 h1:haEcLNpj9Ka1gd3B3tAEs9CpE0c+1IhoL59w/exYU38= -github.com/Microsoft/hcsshim v0.11.5/go.mod h1:MV8xMfmECjl5HdO7U/3/hFVnkmSBjAjmA09d4bExKcU= github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0= github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= @@ -16,10 +14,10 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqy github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/containerd/containerd v1.7.18 h1:jqjZTQNfXGoEaZdW1WwPU0RqSn1Bm2Ay/KJPUuO8nao= github.com/containerd/containerd v1.7.18/go.mod h1:IYEk9/IO6wAPUz2bCMVUbsfXjzw5UNP5fLz4PsUygQ4= -github.com/containerd/errdefs v0.1.0 h1:m0wCRBiu1WJT/Fr+iOoQHMQS/eP5myQ8lCv4Dz5ZURM= -github.com/containerd/errdefs v0.1.0/go.mod h1:YgWiiHtLmSeBrvpw+UfPijzbLaB77mEG1WwJTDETIV0= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= +github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -30,8 +28,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/docker v27.0.3+incompatible h1:aBGI9TeQ4MPlhquTQKq9XbK79rKFVwXNUAYz9aXyEBE= -github.com/docker/docker v27.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY= +github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -113,8 +111,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/testcontainers/testcontainers-go v0.32.0 h1:ug1aK08L3gCHdhknlTTwWjPHPS+/alvLJU/DRxTD/ME= -github.com/testcontainers/testcontainers-go v0.32.0/go.mod h1:CRHrzHLQhlXUsa5gXjTOfqIEJcrK5+xMDmBr/WMI88E= +github.com/testcontainers/testcontainers-go v0.33.0 h1:zJS9PfXYT5O0ZFXM2xxXfk4J5UMw/kRiISng037Gxdw= +github.com/testcontainers/testcontainers-go v0.33.0/go.mod h1:W80YpTa8D5C3Yy16icheD01UTDu+LmXIA2Keo+jWtT8= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= @@ -142,16 +140,16 @@ go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v8 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -167,12 +165,12 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= -golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= +golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q= +golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -184,10 +182,10 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20230920204549-e6e6cdab5c13 h1:vlzZttNJGVqTsRFU9AmdnrcO1Znh8Ew9kCD//yjigk0= -google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 h1:RFiFrvy37/mpSpdySBDrUdipW/dHwsRwh3J3+A9VgT4= -google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.mod h1:Z5Iiy3jtmioajWHDGFk7CeugTyHtPvMHA4UTmUkyalE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb h1:lK0oleSc7IQsUxO3U5TjL9DWlsxpEBemh+zpB7IqhWI= +google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 h1:6GQBEOdGkX6MMTLT9V+TjtIRZCw9VPD5Z+yHY9wMgS0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97/go.mod h1:v7nGkzlmW8P3n/bKmWBn2WpBjpOEx8Q6gMueudAmKfY= google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= diff --git a/main_test.go b/main_test.go index c8cf5811f..78223a2a4 100644 --- a/main_test.go +++ b/main_test.go @@ -1,5 +1,29 @@ -//go:build cassandra || integration -// +build cassandra integration +//go:build cassandra || integration || tc +// +build cassandra integration tc + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 + * Copyright (c) 2016, The Gocql authors, + * provided under the BSD-3-Clause License. + * See the NOTICE file distributed with this work for additional information. + */ package gocql @@ -7,21 +31,57 @@ import ( "context" "flag" "fmt" + "io" "log" "os" "strconv" + "strings" "testing" "time" "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/network" "github.com/testcontainers/testcontainers-go/wait" ) +type TChost struct { + TC testcontainers.Container + Addr string + ID string +} + +var cassNodes = make(map[string]TChost) +var networkName string + func TestMain(m *testing.M) { ctx := context.Background() flag.Parse() + net, err := network.New(ctx) + if err != nil { + log.Fatal("cannot create network: ", err) + } + networkName = net.Name + + //collect cass nodes into a cluster + *flagCluster = "" + for i := 1; i <= *clusterSize; i++ { + err = NodeUpTC(ctx, i) + if err != nil { + log.Fatalf("Failed to start Cassandra node %d: %v", i, err) + } + } + // remove the last coma + *flagCluster = (*flagCluster)[:len(*flagCluster)-1] + + // run all tests + code := m.Run() + + os.Exit(code) +} + +func NodeUpTC(ctx context.Context, number int) error { cassandraVersion := flagCassVersion.String()[1:] jvmOpts := "-Dcassandra.test.fail_writes_ks=test -Dcassandra.custom_query_handler_class=org.apache.cassandra.cql3.CustomPayloadMirroringQueryHandler" @@ -32,7 +92,7 @@ func TestMain(m *testing.M) { env := map[string]string{ "JVM_OPTS": jvmOpts, - "CASSANDRA_SEEDS": "cassandra1", + "CASSANDRA_SEEDS": "node1", "CASSANDRA_DC": "datacenter1", "HEAP_NEWSIZE": "100M", "MAX_HEAP_SIZE": "256M", @@ -45,92 +105,88 @@ func TestMain(m *testing.M) { env["AUTH_TEST"] = "true" } - networkRequest := testcontainers.GenericNetworkRequest{ - NetworkRequest: testcontainers.NetworkRequest{ - Name: "cassandra", - }, - } - cassandraNetwork, err := testcontainers.GenericNetwork(ctx, networkRequest) - if err != nil { - log.Fatalf("Failed to create network: %s", err) - } - defer cassandraNetwork.Remove(ctx) - - // Function to create a Cassandra container (node) - createCassandraContainer := func(number int) (string, error) { - req := testcontainers.ContainerRequest{ - Image: "cassandra:" + cassandraVersion, - ExposedPorts: []string{"9042/tcp"}, - Env: env, - Files: []testcontainers.ContainerFile{ - { - HostFilePath: "./testdata/pki/.keystore", - ContainerFilePath: "testdata/.keystore", - FileMode: 0o777, - }, - { - HostFilePath: "./testdata/pki/.truststore", - ContainerFilePath: "testdata/.truststore", - FileMode: 0o777, - }, - { - HostFilePath: "update_container_cass_config.sh", - ContainerFilePath: "/update_container_cass_config.sh", - FileMode: 0o777, - }, + fs := []testcontainers.ContainerFile{{}} + if *flagRunSslTest { + env["RUN_SSL_TEST"] = "true" + fs = []testcontainers.ContainerFile{ + { + HostFilePath: "./testdata/pki/.keystore", + ContainerFilePath: "testdata/.keystore", + FileMode: 0o777, + }, + { + HostFilePath: "./testdata/pki/.truststore", + ContainerFilePath: "testdata/.truststore", + FileMode: 0o777, + }, + { + HostFilePath: "update_container_cass_config.sh", + ContainerFilePath: "/update_container_cass_config.sh", + FileMode: 0o777, + }, + { + HostFilePath: "./testdata/pki/cqlshrc", + ContainerFilePath: "/root/.cassandra/cqlshrc", + FileMode: 0o777, + }, + { + HostFilePath: "./testdata/pki/gocql.crt", + ContainerFilePath: "/root/.cassandra/gocql.crt", + FileMode: 0o777, + }, + { + HostFilePath: "./testdata/pki/gocql.key", + ContainerFilePath: "/root/.cassandra/gocql.key", + FileMode: 0o777, + }, + { + HostFilePath: "./testdata/pki/ca.crt", + ContainerFilePath: "/root/.cassandra/ca.crt", + FileMode: 0o777, }, - - Networks: []string{"cassandra"}, - LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{ - PostStarts: []testcontainers.ContainerHook{ - func(ctx context.Context, c testcontainers.Container) error { - // wait for cassandra config.yaml to initialize - time.Sleep(100 * time.Millisecond) - - code, _, err := c.Exec(ctx, []string{"bash", "./update_container_cass_config.sh"}) - if err != nil { - return err - } - if code != 0 { - return fmt.Errorf("script ./update_container_cass_config.sh exited with code %d", code) - } - return nil - }, - }, - }}, - WaitingFor: wait.ForLog("Startup complete").WithStartupTimeout(2 * time.Minute), - Name: "cassandra" + strconv.Itoa(number), - } - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - if err != nil { - return "", err } + } - ip, err := container.ContainerIP(ctx) - if err != nil { - return "", err - } + req := testcontainers.ContainerRequest{ + Image: "cassandra:" + cassandraVersion, + Env: env, + Files: fs, + Networks: []string{networkName}, + LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{ + PostStarts: []testcontainers.ContainerHook{ + func(ctx context.Context, c testcontainers.Container) error { + // wait for cassandra config.yaml to initialize + time.Sleep(100 * time.Millisecond) - return ip, nil - } + _, body, err := c.Exec(ctx, []string{"bash", "./update_container_cass_config.sh"}) + if err != nil { + return err + } - // collect cass nodes into a cluster - *flagCluster = "" - for i := 0; i < *clusterSize; i++ { - ip, err := createCassandraContainer(i + 1) - if err != nil { - log.Fatalf("Failed to start Cassandra node %d: %v", i+1, err) - } + data, _ := io.ReadAll(body) + if ok := strings.Contains(string(data), "Cassandra configuration modified successfully."); !ok { + return fmt.Errorf("./update_container_cass_config.sh didn't complete successfully %v", string(data)) + } - // if not the last iteration - if i != *clusterSize-1 { - ip += "," - } + return nil + }, + }, + }}, + WaitingFor: wait.ForLog("Startup complete").WithStartupTimeout(2 * time.Minute), + Name: "node" + strconv.Itoa(number), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return err + } - *flagCluster += ip + cIP, err := container.ContainerIP(ctx) + if err != nil { + return err } if *flagRunAuthTest { @@ -138,8 +194,85 @@ func TestMain(m *testing.M) { time.Sleep(10 * time.Second) } - // run all tests - code := m.Run() + hostID, err := getCassNodeID(ctx, container, cIP) + if err != nil { + return err + } - os.Exit(code) + cassNodes[req.Name] = TChost{ + TC: container, + Addr: cIP, + ID: hostID, + } + + *flagCluster += cIP + "," + + return nil +} + +func getCassNodeID(ctx context.Context, container testcontainers.Container, ip string) (string, error) { + var cmd []string + if *flagRunSslTest { + cmd = []string{"cqlsh", ip, "9042", "--ssl", "ip", "/root/.cassandra/cqlshrc", "-e", "SELECT host_id FROM system.local;"} + } else { + cmd = []string{"cqlsh", "-e", "SELECT host_id FROM system.local;"} + } + + _, reader, err := container.Exec(ctx, cmd) + if err != nil { + return "", fmt.Errorf("failed to execute cqlsh command: %v", err) + } + b, err := io.ReadAll(reader) + if err != nil { + return "", err + } + output := string(b) + + lines := strings.Split(output, "\n") + + if len(lines) < 4 { + return "", fmt.Errorf("unexpected output format, less than 4 lines: %v", lines) + } + hostID := strings.TrimSpace(lines[3]) + + return hostID, nil +} + +// restoreCluster is a helper function that ensures the cluster remains fully operational during topology changes. +// Commonly used in test scenarios where nodes are added, removed, or modified to maintain cluster stability and prevent downtime. +func restoreCluster(ctx context.Context) error { + var cmd []string + for _, container := range cassNodes { + if running := container.TC.IsRunning(); running { + continue + } + if err := container.TC.Start(ctx); err != nil { + return fmt.Errorf("cannot start a container: %v", err) + } + + if *flagRunSslTest { + cmd = []string{"cqlsh", container.Addr, "9042", "--ssl", "ip", "/root/.cassandra/cqlshrc", "-e", "SELECT bootstrapped FROM system.local"} + } else { + cmd = []string{"cqlsh", "-e", "SELECT bootstrapped FROM system.local"} + } + + err := wait.ForExec(cmd).WithResponseMatcher(func(body io.Reader) bool { + data, _ := io.ReadAll(body) + return strings.Contains(string(data), "COMPLETED") + }).WaitUntilReady(ctx, container.TC) + if err != nil { + return fmt.Errorf("cannot wait until fully bootstrapped: %v", err) + } + time.Sleep(5 * time.Second) + } + + return nil +} + +// getPool is a test helper designed to enhance readability by mocking the `func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool)` method. +func getPool(p *policyConnPool, hostID string) (pool *hostConnPool, ok bool) { + p.mu.RLock() + pool, ok = p.hostConnPools[hostID] + p.mu.RUnlock() + return } diff --git a/testdata/pki/cqlshrc b/testdata/pki/cqlshrc new file mode 100644 index 000000000..9939dda67 --- /dev/null +++ b/testdata/pki/cqlshrc @@ -0,0 +1,13 @@ +[ssl] +certfile = /root/.cassandra/gocql.crt +userkey = /root/.cassandra/gocql.key +usercert = /root/.cassandra/gocql.crt +ca_certs = /root/.cassandra/ca.crt +validate = false + +[connection] +factory = cqlshlib.ssl.ssl_transport_factory + +[authentication] +username = cassandra +password = cassandra diff --git a/update_container_cass_config.sh b/update_container_cass_config.sh index f9310dd40..c3bcf0611 100755 --- a/update_container_cass_config.sh +++ b/update_container_cass_config.sh @@ -59,12 +59,6 @@ update_property() { configure_cassandra() { local keypath="testdata" local conf=( - "client_encryption_options.enabled:true" - "client_encryption_options.keystore:$keypath/.keystore" - "client_encryption_options.keystore_password:cassandra" - "client_encryption_options.require_client_auth:true" - "client_encryption_options.truststore:$keypath/.truststore" - "client_encryption_options.truststore_password:cassandra" "concurrent_reads:2" "concurrent_writes:2" ) @@ -76,6 +70,17 @@ configure_cassandra() { ) fi + if [[ $RUN_SSL_TEST == true ]]; then + conf+=( + "client_encryption_options.enabled:true" + "client_encryption_options.keystore:$keypath/.keystore" + "client_encryption_options.keystore_password:cassandra" + "client_encryption_options.require_client_auth:true" + "client_encryption_options.truststore:$keypath/.truststore" + "client_encryption_options.truststore_password:cassandra" + ) + fi + if [[ $CASS_VERSION == 3.*.* ]]; then conf+=( "rpc_server_type:sync"