diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index 4f9d30a43c..c87476d3f7 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -31,7 +31,7 @@ require ( github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cloudquery/cloudquery-api-go v1.13.0 // indirect - github.com/cloudquery/plugin-pb-go v1.22.2 // indirect + github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a // indirect github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -54,6 +54,7 @@ require ( github.com/oapi-codegen/runtime v1.1.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/samber/lo v1.47.0 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/examples/simple_plugin/go.sum b/examples/simple_plugin/go.sum index d995993be1..5d80071b1c 100644 --- a/examples/simple_plugin/go.sum +++ b/examples/simple_plugin/go.sum @@ -48,8 +48,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cloudquery/cloudquery-api-go v1.13.0 h1:/yv9iaCUFjcmIgkLjY3iPVEWajtGFyFzaSaykszdJmo= github.com/cloudquery/cloudquery-api-go v1.13.0/go.mod h1:5oo8HHnv2Y7NgcVvZn59xFlYKJUyeP0tcN8JH3IP2Aw= -github.com/cloudquery/plugin-pb-go v1.22.2 h1:qGQnhQNK3weMfyb1e7HZoEialP76htSuO0pPMiXCrH8= -github.com/cloudquery/plugin-pb-go v1.22.2/go.mod h1:G6F9D2mDA3lUDuSxGDLsiG6dBdi6uWZhddlCabmKdOc= +github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a h1:rldWTSn2YDOhbLJiDR9wI94do6xPtXIfqbyu2a1z3W0= +github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a/go.mod h1:G6F9D2mDA3lUDuSxGDLsiG6dBdi6uWZhddlCabmKdOc= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= diff --git a/go.mod b/go.mod index e3cfc0980f..5e870241aa 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.23.6 github.com/bradleyjkemp/cupaloy/v2 v2.8.0 github.com/cloudquery/cloudquery-api-go v1.13.0 - github.com/cloudquery/plugin-pb-go v1.22.2 + github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a github.com/cloudquery/plugin-sdk/v2 v2.7.0 github.com/goccy/go-json v0.10.3 github.com/golang/mock v1.6.0 @@ -20,6 +20,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.7 github.com/invopop/jsonschema v0.12.0 github.com/rs/zerolog v1.33.0 + github.com/samber/lo v1.47.0 github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6a83682914..f22b2256e7 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cloudquery/cloudquery-api-go v1.13.0 h1:/yv9iaCUFjcmIgkLjY3iPVEWajtGFyFzaSaykszdJmo= github.com/cloudquery/cloudquery-api-go v1.13.0/go.mod h1:5oo8HHnv2Y7NgcVvZn59xFlYKJUyeP0tcN8JH3IP2Aw= -github.com/cloudquery/plugin-pb-go v1.22.2 h1:qGQnhQNK3weMfyb1e7HZoEialP76htSuO0pPMiXCrH8= -github.com/cloudquery/plugin-pb-go v1.22.2/go.mod h1:G6F9D2mDA3lUDuSxGDLsiG6dBdi6uWZhddlCabmKdOc= +github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a h1:rldWTSn2YDOhbLJiDR9wI94do6xPtXIfqbyu2a1z3W0= +github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a/go.mod h1:G6F9D2mDA3lUDuSxGDLsiG6dBdi6uWZhddlCabmKdOc= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index 917fe3c58b..8dc33d74d2 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -178,6 +178,12 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error { Connection: req.Backend.Connection, } } + if req.Shard != nil { + syncOptions.Shard = &plugin.Shard{ + Num: req.Shard.Num, + Total: req.Shard.Total, + } + } go func() { defer flushMetrics() diff --git a/plugin/plugin_source.go b/plugin/plugin_source.go index 5593a8c115..6bdbfc8e1b 100644 --- a/plugin/plugin_source.go +++ b/plugin/plugin_source.go @@ -15,12 +15,18 @@ type BackendOptions struct { Connection string } +type Shard struct { + Num int32 + Total int32 +} + type SyncOptions struct { Tables []string SkipTables []string SkipDependentTables bool DeterministicCQID bool BackendOptions *BackendOptions + Shard *Shard } type SourceClient interface { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ceea8f853b..24499f1ea8 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -13,6 +13,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" + "github.com/samber/lo" "github.com/thoas/go-funk" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -90,6 +91,12 @@ func WithInvocationID(invocationID string) Option { } } +func WithShard(num int32, total int32) SyncOption { + return func(s *syncClient) { + s.shard = &shard{num: num, total: total} + } +} + type Client interface { ID() string } @@ -119,6 +126,11 @@ type Scheduler struct { invocationID string } +type shard struct { + num int32 + total int32 +} + type syncClient struct { tables schema.Tables client schema.ClientMeta @@ -128,6 +140,8 @@ type syncClient struct { metrics *Metrics logger zerolog.Logger invocationID string + + shard *shard } func NewScheduler(opts ...Option) *Scheduler { @@ -346,3 +360,23 @@ func maxDepth(tables schema.Tables) uint64 { } return depth } + +func shardTableClients(tableClients []tableClient, shard *shard) []tableClient { + if shard == nil || len(tableClients) == 0 { + return tableClients + } + num := int(shard.num) + total := int(shard.total) + chunkSize := len(tableClients) / total + if chunkSize == 0 { + chunkSize = 1 + } + chunks := lo.Chunk(tableClients, chunkSize) + if num > len(chunks) { + return nil + } + if len(chunks) > total && num == total { + return append(chunks[num-1], chunks[num]...) + } + return chunks[num-1] +} diff --git a/scheduler/scheduler_debug.go b/scheduler/scheduler_debug.go index c47a4b5baf..3c737522d7 100644 --- a/scheduler/scheduler_debug.go +++ b/scheduler/scheduler_debug.go @@ -67,6 +67,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR } } shuffle(allClients, seed) + allClients = shardTableClients(allClients, s.shard) var wg sync.WaitGroup for _, tc := range allClients { diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 2a0ed9853c..cd210cae25 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -40,27 +40,34 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche s.metrics.initWithClients(table, clients) } - var wg sync.WaitGroup + tableClients := make([]tableClient, 0) for i, table := range s.tables { - table := table - clients := preInitialisedClients[i] - for _, client := range clients { - client := client - if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil { - // This means context was cancelled - wg.Wait() - return - } - wg.Add(1) - go func() { - defer wg.Done() - defer s.scheduler.tableSems[0].Release(1) - // not checking for error here as nothing much todo. - // the error is logged and this happens when context is cancelled - s.resolveTableDfs(ctx, table, client, nil, resolvedResources, 1) - }() + for _, client := range preInitialisedClients[i] { + tableClients = append(tableClients, tableClient{table: table, client: client}) } } + tableClients = shardTableClients(tableClients, s.shard) + + var wg sync.WaitGroup + for _, tc := range tableClients { + table := tc.table + cl := tc.client + if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil { + // This means context was cancelled + wg.Wait() + return + } + wg.Add(1) + go func() { + defer wg.Done() + defer s.scheduler.tableSems[0].Release(1) + // not checking for error here as nothing much to do. + // the error is logged and this happens when context is cancelled + // Round Robin currently uses the DFS algorithm to resolve the tables, but this + // may change in the future. + s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1) + }() + } // Wait for all the worker goroutines to finish wg.Wait() diff --git a/scheduler/scheduler_round_robin.go b/scheduler/scheduler_round_robin.go index 68eb3695d2..bc445a13ca 100644 --- a/scheduler/scheduler_round_robin.go +++ b/scheduler/scheduler_round_robin.go @@ -37,6 +37,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan< } tableClients := roundRobinInterleave(s.tables, preInitialisedClients) + tableClients = shardTableClients(tableClients, s.shard) var wg sync.WaitGroup for _, tc := range tableClients { diff --git a/scheduler/scheduler_shuffle.go b/scheduler/scheduler_shuffle.go index dfb8737a30..91d5da6c3b 100644 --- a/scheduler/scheduler_shuffle.go +++ b/scheduler/scheduler_shuffle.go @@ -45,6 +45,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- * // so users have a little bit of control over the randomization. seed := hashTableNames(tableNames) shuffle(tableClients, seed) + tableClients = shardTableClients(tableClients, s.shard) var wg sync.WaitGroup for _, tc := range tableClients { diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 53b2e6bf5e..bfd166e9b1 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -481,3 +481,120 @@ func TestScheduler_Cancellation(t *testing.T) { } } } + +func Test_shardTableClients(t *testing.T) { + type testCase struct { + name string + tableClients []tableClient + shard *shard + expected []tableClient + } + + tests := []testCase{ + { + name: "nil shard returns all table clients", + tableClients: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + }, + expected: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + }, + }, + { + name: "nil table clients", + tableClients: nil, + shard: &shard{num: 1, total: 2}, + expected: nil, + }, + { + name: "empty table clients", + tableClients: []tableClient{}, + shard: &shard{num: 1, total: 2}, + expected: []tableClient{}, + }, + { + name: "even shard 1 of 2", + tableClients: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + }, + shard: &shard{num: 1, total: 2}, + expected: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + }, + }, + { + name: "even shard 2 of 2", + tableClients: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + }, + shard: &shard{num: 2, total: 2}, + expected: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + }, + }, + { + name: "uneven split 1 of 2", + tableClients: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}}, + }, + shard: &shard{num: 1, total: 2}, + expected: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + }, + }, + { + name: "uneven split 2 of 2", + tableClients: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}}, + }, + shard: &shard{num: 2, total: 2}, + expected: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}}, + }, + }, + { + name: "more shards than table clients", + tableClients: []tableClient{ + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}}, + {client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}}, + }, + shard: &shard{num: 5, total: 100}, + expected: nil, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + actual := shardTableClients(tc.tableClients, tc.shard) + require.Equal(t, tc.expected, actual) + }) + } +}