Skip to content

Commit

Permalink
improves replication tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aliszka committed May 14, 2024
1 parent f1cc22b commit c68fd7d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 91 deletions.
110 changes: 37 additions & 73 deletions test/acceptance/replication/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/weaviate/weaviate/test/helper"
"github.com/weaviate/weaviate/test/helper/sample-schema/articles"
"github.com/weaviate/weaviate/usecases/replica"
"golang.org/x/sync/errgroup"
)

var (
Expand Down Expand Up @@ -359,25 +358,25 @@ func eventualReplicaCRUD(t *testing.T) {
})

t.Run("assert all previous data replicated to node 2", func(t *testing.T) {
resp := gqlGet(t, compose.GetWeaviateNode2().URI(), "Article", replica.Quorum)
assert.Len(t, resp, len(articleIDs))
resp = gqlGet(t, compose.GetWeaviateNode2().URI(), "Paragraph", replica.Quorum)
assert.Len(t, resp, len(paragraphIDs))
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
resp := gqlGet(t, compose.GetWeaviateNode2().URI(), "Article", replica.One)
assert.Len(collect, resp, len(articleIDs))
resp = gqlGet(t, compose.GetWeaviateNode2().URI(), "Paragraph", replica.One)
assert.Len(collect, resp, len(paragraphIDs))
}, 5*time.Second, 100*time.Millisecond)
})

t.Run("RestartNode-3", func(t *testing.T) {
startNodeAt(ctx, t, compose, 3)
})

t.Run("assert all previous data replicated to node 3", func(t *testing.T) {
resp := gqlGet(t, compose.GetWeaviateNode3().URI(), "Article", replica.All)
assert.Len(t, resp, len(articleIDs))
resp = gqlGet(t, compose.GetWeaviateNode3().URI(), "Paragraph", replica.All)
assert.Len(t, resp, len(paragraphIDs))
})

t.Run("RestartCluster", func(t *testing.T) {
restartCluster(ctx, t, compose)
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
resp := gqlGet(t, compose.GetWeaviateNode3().URI(), "Article", replica.All)
assert.Len(collect, resp, len(articleIDs))
resp = gqlGet(t, compose.GetWeaviateNode3().URI(), "Paragraph", replica.All)
assert.Len(collect, resp, len(paragraphIDs))
}, 5*time.Second, 100*time.Millisecond)
})

t.Run("assert any future writes are replicated", func(t *testing.T) {
Expand All @@ -395,30 +394,32 @@ func eventualReplicaCRUD(t *testing.T) {
patchObject(t, compose.GetWeaviateNode2().URI(), patch)
})

t.Run("StopNode-2", func(t *testing.T) {
stopNodeAt(ctx, t, compose, 2)
})

t.Run("PatchedOnNode-1", func(t *testing.T) {
after, err := getObjectFromNode(t, compose.GetWeaviate().URI(), "Article", articleIDs[0], "node1")
require.Nil(t, err)

newVal, ok := after.Properties.(map[string]interface{})["title"]
require.True(t, ok)
assert.Equal(t, newTitle, newVal)
require.Contains(t, after.Properties.(map[string]interface{}), "title")
assert.Equal(t, newTitle, after.Properties.(map[string]interface{})["title"])
})

t.Run("RestartNode-2", func(t *testing.T) {
startNodeAt(ctx, t, compose, 2)
t.Run("PatchedOnNode-2", func(t *testing.T) {
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
after, err := getObjectFromNode(t, compose.GetWeaviateNode2().URI(), "Article", articleIDs[0], "node2")
require.Nil(collect, err)

require.Contains(collect, after.Properties.(map[string]interface{}), "title")
assert.Equal(collect, newTitle, after.Properties.(map[string]interface{})["title"])
}, 5*time.Second, 100*time.Millisecond)
})

t.Run("PatchedOnNode-3", func(t *testing.T) {
after, err := getObjectFromNode(t, compose.GetWeaviateNode3().URI(), "Article", articleIDs[0], "node3")
require.Nil(t, err)
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
after, err := getObjectFromNode(t, compose.GetWeaviate().URI(), "Article", articleIDs[0], "node3")
require.Nil(collect, err)

newVal, ok := after.Properties.(map[string]interface{})["title"]
require.True(t, ok)
assert.Equal(t, newTitle, newVal)
require.Contains(collect, after.Properties.(map[string]interface{}), "title")
assert.Equal(collect, newTitle, after.Properties.(map[string]interface{})["title"])
}, 5*time.Second, 100*time.Millisecond)
})
})

Expand All @@ -427,37 +428,25 @@ func eventualReplicaCRUD(t *testing.T) {
deleteObject(t, compose.GetWeaviateNode2().URI(), "Article", articleIDs[0])
})

t.Run("StopNode-2", func(t *testing.T) {
stopNodeAt(ctx, t, compose, 2)
})

t.Run("OnNode-1", func(t *testing.T) {
_, err := getObjectFromNode(t, compose.GetWeaviate().URI(), "Article", articleIDs[0], "node1")
assert.Equal(t, &objects.ObjectsClassGetNotFound{}, err)
})

t.Run("RestartNode-2", func(t *testing.T) {
startNodeAt(ctx, t, compose, 2)
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
_, err := getObjectFromNode(t, compose.GetWeaviate().URI(), "Article", articleIDs[0], "node1")
assert.Equal(collect, &objects.ObjectsClassGetNotFound{}, err)
}, 5*time.Second, 100*time.Millisecond)
})
})

t.Run("BatchAllObjects", func(t *testing.T) {
t.Run("BatchDeleteAllObjects", func(t *testing.T) {
t.Run("OnNode-2", func(t *testing.T) {
deleteObjects(t, compose.GetWeaviateNode2().URI(),
"Article", []string{"title"}, "Article#*")
})

t.Run("StopNode-2", func(t *testing.T) {
stopNodeAt(ctx, t, compose, 2)
})

t.Run("OnNode-1", func(t *testing.T) {
resp := gqlGet(t, compose.GetWeaviate().URI(), "Article", replica.One)
assert.Empty(t, resp)
})

t.Run("RestartNode-2", func(t *testing.T) {
startNodeAt(ctx, t, compose, 2)
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
resp := gqlGet(t, compose.GetWeaviate().URI(), "Article", replica.One)
assert.Empty(collect, resp)
}, 5*time.Second, 100*time.Millisecond)
})
})

Expand All @@ -480,31 +469,6 @@ func eventualReplicaCRUD(t *testing.T) {
})
}

func restartCluster(ctx context.Context, t *testing.T, compose *docker.DockerCompose) {
// since node1 is the gossip "leader", node 2 and 3 must be stopped and restarted
// after node1 to re-facilitate internode communication
eg := errgroup.Group{}
eg.Go(func() error {
require.Nil(t, compose.StartAt(ctx, 1))
return nil
})
eg.Go(func() error { // restart node 2
time.Sleep(3 * time.Second) // wait for member list initialization
stopNodeAt(ctx, t, compose, 2)
require.Nil(t, compose.StartAt(ctx, 2))
return nil
})
eg.Go(func() error { // restart node 3
time.Sleep(3 * time.Second) // wait for member list initialization
stopNodeAt(ctx, t, compose, 3)
require.Nil(t, compose.StartAt(ctx, 3))
return nil
})

eg.Wait()
<-time.After(3 * time.Second) // wait for initialization
}

func stopNodeAt(ctx context.Context, t *testing.T, compose *docker.DockerCompose, index int) {
<-time.After(1 * time.Second)
require.Nil(t, compose.StopAt(ctx, index, nil))
Expand Down
42 changes: 24 additions & 18 deletions test/acceptance/replication/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,20 @@ func multiShardScaleOut(t *testing.T) {
})

t.Run("assert paragraphs were scaled out", func(t *testing.T) {
n := getNodes(t, compose.GetWeaviate().URI())
var shardsFound int
for _, node := range n.Nodes {
for _, shard := range node.Shards {
if shard.Class == paragraphClass.Class {
assert.EqualValues(t, 10, shard.ObjectCount)
shardsFound++
// shard.ObjectCount is eventually consistent, see Bucket::CountAsync()
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
n := getNodes(t, compose.GetWeaviate().URI())
var shardsFound int
for _, node := range n.Nodes {
for _, shard := range node.Shards {
if shard.Class == paragraphClass.Class {
assert.EqualValues(collect, int64(10), shard.ObjectCount)
shardsFound++
}
}
}
}
assert.Equal(t, 2, shardsFound)
assert.Equal(collect, 2, shardsFound)
}, 10*time.Second, 100*time.Millisecond)
})

t.Run("scale out articles", func(t *testing.T) {
Expand All @@ -118,17 +121,20 @@ func multiShardScaleOut(t *testing.T) {
})

t.Run("assert articles were scaled out", func(t *testing.T) {
n := getNodes(t, compose.GetWeaviate().URI())
var shardsFound int
for _, node := range n.Nodes {
for _, shard := range node.Shards {
if shard.Class == articleClass.Class {
assert.EqualValues(t, 10, shard.ObjectCount)
shardsFound++
// shard.ObjectCount is eventually consistent, see Bucket::CountAsync()
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
n := getNodes(t, compose.GetWeaviate().URI())
var shardsFound int
for _, node := range n.Nodes {
for _, shard := range node.Shards {
if shard.Class == articleClass.Class {
assert.EqualValues(collect, int64(10), shard.ObjectCount)
shardsFound++
}
}
}
}
assert.Equal(t, 2, shardsFound)
assert.Equal(collect, 2, shardsFound)
}, 10*time.Second, 100*time.Millisecond)
})

t.Run("kill a node and check contents of remaining node", func(t *testing.T) {
Expand Down

0 comments on commit c68fd7d

Please sign in to comment.