Skip to content

Commit

Permalink
feat: Add sync sharding options
Browse files Browse the repository at this point in the history
  • Loading branch information
erezrokah committed Sep 17, 2024
1 parent 6d70b88 commit bd09fa9
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 24 deletions.
3 changes: 2 additions & 1 deletion examples/simple_plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cloudquery/cloudquery-api-go v1.13.0 // indirect
github.com/cloudquery/plugin-pb-go v1.22.2 // indirect
github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a // indirect
github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
Expand All @@ -54,6 +54,7 @@ require (
github.com/oapi-codegen/runtime v1.1.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
6 changes: 4 additions & 2 deletions examples/simple_plugin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cloudquery/cloudquery-api-go v1.13.0 h1:/yv9iaCUFjcmIgkLjY3iPVEWajtGFyFzaSaykszdJmo=
github.com/cloudquery/cloudquery-api-go v1.13.0/go.mod h1:5oo8HHnv2Y7NgcVvZn59xFlYKJUyeP0tcN8JH3IP2Aw=
github.com/cloudquery/plugin-pb-go v1.22.2 h1:qGQnhQNK3weMfyb1e7HZoEialP76htSuO0pPMiXCrH8=
github.com/cloudquery/plugin-pb-go v1.22.2/go.mod h1:G6F9D2mDA3lUDuSxGDLsiG6dBdi6uWZhddlCabmKdOc=
github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a h1:rldWTSn2YDOhbLJiDR9wI94do6xPtXIfqbyu2a1z3W0=
github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a/go.mod h1:G6F9D2mDA3lUDuSxGDLsiG6dBdi6uWZhddlCabmKdOc=
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down Expand Up @@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.23.6
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/cloudquery/cloudquery-api-go v1.13.0
github.com/cloudquery/plugin-pb-go v1.22.2
github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a
github.com/cloudquery/plugin-sdk/v2 v2.7.0
github.com/goccy/go-json v0.10.3
github.com/golang/mock v1.6.0
Expand All @@ -20,6 +20,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/invopop/jsonschema v0.12.0
github.com/rs/zerolog v1.33.0
github.com/samber/lo v1.47.0
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cloudquery/cloudquery-api-go v1.13.0 h1:/yv9iaCUFjcmIgkLjY3iPVEWajtGFyFzaSaykszdJmo=
github.com/cloudquery/cloudquery-api-go v1.13.0/go.mod h1:5oo8HHnv2Y7NgcVvZn59xFlYKJUyeP0tcN8JH3IP2Aw=
github.com/cloudquery/plugin-pb-go v1.22.2 h1:qGQnhQNK3weMfyb1e7HZoEialP76htSuO0pPMiXCrH8=
github.com/cloudquery/plugin-pb-go v1.22.2/go.mod h1:G6F9D2mDA3lUDuSxGDLsiG6dBdi6uWZhddlCabmKdOc=
github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a h1:rldWTSn2YDOhbLJiDR9wI94do6xPtXIfqbyu2a1z3W0=
github.com/cloudquery/plugin-pb-go v1.22.3-0.20240911130524-a689b180bf1a/go.mod h1:G6F9D2mDA3lUDuSxGDLsiG6dBdi6uWZhddlCabmKdOc=
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down Expand Up @@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
Expand Down
6 changes: 6 additions & 0 deletions internal/servers/plugin/v3/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
Connection: req.Backend.Connection,
}
}
if req.Shard != nil {
syncOptions.Shard = &plugin.Shard{
Num: req.Shard.Num,
Total: req.Shard.Total,
}
}

go func() {
defer flushMetrics()
Expand Down
6 changes: 6 additions & 0 deletions plugin/plugin_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ type BackendOptions struct {
Connection string
}

type Shard struct {
Num int32
Total int32
}

type SyncOptions struct {
Tables []string
SkipTables []string
SkipDependentTables bool
DeterministicCQID bool
BackendOptions *BackendOptions
Shard *Shard
}

type SourceClient interface {
Expand Down
34 changes: 34 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
"github.com/samber/lo"
"github.com/thoas/go-funk"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -90,6 +91,12 @@ func WithInvocationID(invocationID string) Option {
}
}

func WithShard(num int32, total int32) SyncOption {
return func(s *syncClient) {
s.shard = &shard{num: num, total: total}
}
}

type Client interface {
ID() string
}
Expand Down Expand Up @@ -119,6 +126,11 @@ type Scheduler struct {
invocationID string
}

type shard struct {
num int32
total int32
}

type syncClient struct {
tables schema.Tables
client schema.ClientMeta
Expand All @@ -128,6 +140,8 @@ type syncClient struct {
metrics *Metrics
logger zerolog.Logger
invocationID string

shard *shard
}

func NewScheduler(opts ...Option) *Scheduler {
Expand Down Expand Up @@ -346,3 +360,23 @@ func maxDepth(tables schema.Tables) uint64 {
}
return depth
}

func shardTableClients(tableClients []tableClient, shard *shard) []tableClient {
if shard == nil || len(tableClients) == 0 {
return tableClients
}
num := int(shard.num)
total := int(shard.total)
chunkSize := len(tableClients) / total
if chunkSize == 0 {
chunkSize = 1
}
chunks := lo.Chunk(tableClients, chunkSize)
if num > len(chunks) {
return nil
}
if len(chunks) > total && num == total {
return append(chunks[num-1], chunks[num]...)
}
return chunks[num-1]
}
1 change: 1 addition & 0 deletions scheduler/scheduler_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR
}
}
shuffle(allClients, seed)
allClients = shardTableClients(allClients, s.shard)

var wg sync.WaitGroup
for _, tc := range allClients {
Expand Down
43 changes: 25 additions & 18 deletions scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,34 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche
s.metrics.initWithClients(table, clients)
}

var wg sync.WaitGroup
tableClients := make([]tableClient, 0)
for i, table := range s.tables {
table := table
clients := preInitialisedClients[i]
for _, client := range clients {
client := client
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
// This means context was cancelled
wg.Wait()
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer s.scheduler.tableSems[0].Release(1)
// not checking for error here as nothing much todo.
// the error is logged and this happens when context is cancelled
s.resolveTableDfs(ctx, table, client, nil, resolvedResources, 1)
}()
for _, client := range preInitialisedClients[i] {
tableClients = append(tableClients, tableClient{table: table, client: client})
}
}
tableClients = shardTableClients(tableClients, s.shard)

var wg sync.WaitGroup
for _, tc := range tableClients {
table := tc.table
cl := tc.client
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
// This means context was cancelled
wg.Wait()
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer s.scheduler.tableSems[0].Release(1)
// not checking for error here as nothing much to do.
// the error is logged and this happens when context is cancelled
// Round Robin currently uses the DFS algorithm to resolve the tables, but this
// may change in the future.
s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1)
}()
}

// Wait for all the worker goroutines to finish
wg.Wait()
Expand Down
1 change: 1 addition & 0 deletions scheduler/scheduler_round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan<
}

tableClients := roundRobinInterleave(s.tables, preInitialisedClients)
tableClients = shardTableClients(tableClients, s.shard)

var wg sync.WaitGroup
for _, tc := range tableClients {
Expand Down
1 change: 1 addition & 0 deletions scheduler/scheduler_shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *
// so users have a little bit of control over the randomization.
seed := hashTableNames(tableNames)
shuffle(tableClients, seed)
tableClients = shardTableClients(tableClients, s.shard)

var wg sync.WaitGroup
for _, tc := range tableClients {
Expand Down
117 changes: 117 additions & 0 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,120 @@ func TestScheduler_Cancellation(t *testing.T) {
}
}
}

func Test_shardTableClients(t *testing.T) {
type testCase struct {
name string
tableClients []tableClient
shard *shard
expected []tableClient
}

tests := []testCase{
{
name: "nil shard returns all table clients",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
},
{
name: "nil table clients",
tableClients: nil,
shard: &shard{num: 1, total: 2},
expected: nil,
},
{
name: "empty table clients",
tableClients: []tableClient{},
shard: &shard{num: 1, total: 2},
expected: []tableClient{},
},
{
name: "even shard 1 of 2",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
shard: &shard{num: 1, total: 2},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
},
},
{
name: "even shard 2 of 2",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
shard: &shard{num: 2, total: 2},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
},
{
name: "uneven split 1 of 2",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
},
shard: &shard{num: 1, total: 2},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
},
},
{
name: "uneven split 2 of 2",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
},
shard: &shard{num: 2, total: 2},
expected: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
},
},
{
name: "more shards than table clients",
tableClients: []tableClient{
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
},
shard: &shard{num: 5, total: 100},
expected: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
actual := shardTableClients(tc.tableClients, tc.shard)
require.Equal(t, tc.expected, actual)
})
}
}

0 comments on commit bd09fa9

Please sign in to comment.