Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add sync sharding #1891

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/simple_plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/oapi-codegen/runtime v1.1.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/simple_plugin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/invopop/jsonschema v0.12.0
github.com/rs/zerolog v1.33.0
github.com/samber/lo v1.47.0
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
Expand Down
6 changes: 6 additions & 0 deletions internal/servers/plugin/v3/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
Connection: req.Backend.Connection,
}
}
if req.Shard != nil {
syncOptions.Shard = &plugin.Shard{
Num: req.Shard.Num,
Total: req.Shard.Total,
}
}

go func() {
defer flushMetrics()
Expand Down
6 changes: 6 additions & 0 deletions plugin/plugin_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ type BackendOptions struct {
Connection string
}

type Shard struct {
Num int32
Total int32
}

type SyncOptions struct {
Tables []string
SkipTables []string
SkipDependentTables bool
DeterministicCQID bool
BackendOptions *BackendOptions
Shard *Shard
}

type SourceClient interface {
Expand Down
35 changes: 35 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
"github.com/samber/lo"
"github.com/thoas/go-funk"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -90,6 +91,12 @@ func WithInvocationID(invocationID string) Option {
}
}

func WithShard(num int32, total int32) SyncOption {
return func(s *syncClient) {
s.shard = &shard{num: num, total: total}
}
}

type Client interface {
ID() string
}
Expand Down Expand Up @@ -119,6 +126,11 @@ type Scheduler struct {
invocationID string
}

type shard struct {
num int32
marianogappa marked this conversation as resolved.
Show resolved Hide resolved
total int32
}

type syncClient struct {
tables schema.Tables
client schema.ClientMeta
Expand All @@ -128,6 +140,8 @@ type syncClient struct {
metrics *Metrics
logger zerolog.Logger
invocationID string

shard *shard
}

func NewScheduler(opts ...Option) *Scheduler {
Expand Down Expand Up @@ -346,3 +360,24 @@ func maxDepth(tables schema.Tables) uint64 {
}
return depth
}

func shardTableClients(tableClients []tableClient, shard *shard) []tableClient {
// For sharding to work as expected, tableClients must be deterministic between different shards.
if shard == nil || len(tableClients) == 0 {
return tableClients
}
num := int(shard.num)
total := int(shard.total)
chunkSize := len(tableClients) / total
if chunkSize == 0 {
chunkSize = 1
}
chunks := lo.Chunk(tableClients, chunkSize)
if num > len(chunks) {
return nil
marianogappa marked this conversation as resolved.
Show resolved Hide resolved
}
if len(chunks) > total && num == total {
return append(chunks[num-1], chunks[num]...)
}
return chunks[num-1]
}
1 change: 1 addition & 0 deletions scheduler/scheduler_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR
}
}
shuffle(allClients, seed)
allClients = shardTableClients(allClients, s.shard)

var wg sync.WaitGroup
for _, tc := range allClients {
Expand Down
43 changes: 25 additions & 18 deletions scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,34 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche
s.metrics.initWithClients(table, clients)
}

var wg sync.WaitGroup
tableClients := make([]tableClient, 0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically the DFS scheduler didn't need to create table clients pairs, since we didn't do any sorting in the DFS scheduler. Because of the sharding support, we need to first the table client pairs, so we can shard them before the sync starts

for i, table := range s.tables {
table := table
clients := preInitialisedClients[i]
for _, client := range clients {
client := client
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
// This means context was cancelled
wg.Wait()
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer s.scheduler.tableSems[0].Release(1)
// not checking for error here as nothing much todo.
// the error is logged and this happens when context is cancelled
s.resolveTableDfs(ctx, table, client, nil, resolvedResources, 1)
}()
for _, client := range preInitialisedClients[i] {
tableClients = append(tableClients, tableClient{table: table, client: client})
}
}
tableClients = shardTableClients(tableClients, s.shard)

var wg sync.WaitGroup
for _, tc := range tableClients {
table := tc.table
cl := tc.client
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
// This means context was cancelled
wg.Wait()
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer s.scheduler.tableSems[0].Release(1)
// not checking for error here as nothing much to do.
// the error is logged and this happens when context is cancelled
// Round Robin currently uses the DFS algorithm to resolve the tables, but this
// may change in the future.
s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1)
}()
}

// Wait for all the worker goroutines to finish
wg.Wait()
Expand Down
1 change: 1 addition & 0 deletions scheduler/scheduler_round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan<
}

tableClients := roundRobinInterleave(s.tables, preInitialisedClients)
tableClients = shardTableClients(tableClients, s.shard)

var wg sync.WaitGroup
for _, tc := range tableClients {
Expand Down
1 change: 1 addition & 0 deletions scheduler/scheduler_shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *
// however, if the table order changes, the seed will change and the shuffle order will be different,
// so users have a little bit of control over the randomization.
seed := hashTableNames(tableNames)
tableClients = shardTableClients(tableClients, s.shard)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do see that shuffle is deterministic (at the moment), but I still think it's a bad idea to shard after shuffling. I'd move it before the shuffle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let me try and switch the order and re-run the tests. We shuffle (this is the default in AWS) to avoid rate limits. Don't think sharding before shuffling will make a difference in that aspect but I'll re-test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think sharding before shuffling will make a difference in that aspect but I'll re-test

I think it will be fine since we round-robin before we shuffle anyways

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok did a bit of testing and it looks good so we can shard before shuffle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case where collecting the tables could be non-deterministic? Normally the tables are hardcoded in a plugin, so it should not be the case. If there is a plugin where the tables are dynamic, and they could change between syncs (e.g. if they are discovered by an API which is non-deterministic), sharding would not work.

In either case, I think the deterministic requirement is worth a one-liner comment.

Copy link
Member Author

@erezrokah erezrokah Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case where collecting the tables could be non-deterministic?

This is a good point, and definitely a limitation of this approach, see below ⬇️

  • It can happen due to a bug in the plugin https://github.com/cloudquery/cloudquery-private/pull/4299.
  • Plugins with dynamic tables don't use the scheduler, they do their own thing so they would need to implement sharding on the plugin's side (if needed. e.g. For Postgres source probably better to use a stronger machine instead of sharding)
  • I can think of other cases, e.g. someone creating an AWS account after shard 1/2 discovery and before shard 2/2 discovery. If we discover all accounts, that will mess up the sharding. A solution would be to hard code the accounts in the spec to avoid it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment about the requirement

shuffle(tableClients, seed)

var wg sync.WaitGroup
Expand Down
117 changes: 117 additions & 0 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,120 @@ func TestScheduler_Cancellation(t *testing.T) {
}
}
}

func Test_shardTableClients(t *testing.T) {
type testCase struct {
name string
tableClients []tableClient
shard *shard
expected []tableClient
}

tests := []testCase{
{
name: "nil shard returns all table clients",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
},
{
name: "nil table clients",
tableClients: nil,
shard: &shard{num: 1, total: 2},
expected: nil,
},
{
name: "empty table clients",
tableClients: []tableClient{},
shard: &shard{num: 1, total: 2},
expected: []tableClient{},
},
{
name: "even shard 1 of 2",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
shard: &shard{num: 1, total: 2},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
},
},
{
name: "even shard 2 of 2",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
shard: &shard{num: 2, total: 2},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
},
{
name: "uneven split 1 of 2",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
},
shard: &shard{num: 1, total: 2},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
},
},
{
name: "uneven split 2 of 2",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
},
shard: &shard{num: 2, total: 2},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
},
},
{
name: "more shards than table clients",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
shard: &shard{num: 5, total: 100},
expected: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
actual := shardTableClients(tc.tableClients, tc.shard)
require.Equal(t, tc.expected, actual)
})
}
}
Loading