Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint_golang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.54.2
version: v1.55.2
args: --verbose
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ linters-settings:
disabled: true
- name: nested-structs
disabled: true
- name: import-alias-naming
disabled: true
- name: unchecked-type-assertion
disabled: true

gofmt:
rewrite-rules:
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/opencontainers/image-spec v1.0.2
github.com/rs/zerolog v1.29.1
github.com/schollz/progressbar/v3 v3.13.1
github.com/stretchr/testify v1.8.4
github.com/thoas/go-funk v0.9.3
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
google.golang.org/grpc v1.58.3
Expand All @@ -36,6 +37,7 @@ require (
github.com/bytedance/sonic v1.10.2 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deepmap/oapi-codegen v1.15.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
Expand Down Expand Up @@ -84,6 +86,7 @@ require (
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/schollz/closestmatch v2.1.0+incompatible // indirect
Expand Down
13 changes: 7 additions & 6 deletions managedplugin/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,21 @@ func getURLLocation(ctx context.Context, org string, name string, version string
}
resp.Body.Close()
// Check server response
switch {
case resp.StatusCode == http.StatusNotFound:
switch resp.StatusCode {
case http.StatusOK:
return nil
case http.StatusNotFound:
return err404
case resp.StatusCode == http.StatusUnauthorized:
case http.StatusUnauthorized:
fmt.Printf("Failed downloading %s with status code %d. Retrying\n", downloadURL, resp.StatusCode)
return err401
case resp.StatusCode == http.StatusTooManyRequests:
case http.StatusTooManyRequests:
fmt.Printf("Failed downloading %s with status code %d. Retrying\n", downloadURL, resp.StatusCode)
return err429
case resp.StatusCode >= http.StatusBadRequest: // anything that's not 200 or 30*
default:
fmt.Printf("Failed downloading %s with status code %d\n", downloadURL, resp.StatusCode)
return fmt.Errorf("statusCode %d", resp.StatusCode)
}
return nil
}, retry.RetryIf(func(err error) bool {
return err == err401 || err == err429
}),
Expand Down
8 changes: 4 additions & 4 deletions managedplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ type Client struct {

// typ will be deprecated soon but now required for a transition period
func NewClients(ctx context.Context, typ PluginType, specs []Config, opts ...Option) (Clients, error) {
clients := make(Clients, len(specs))
for i, spec := range specs {
clients := make(Clients, 0, len(specs))
for _, spec := range specs {
client, err := NewClient(ctx, typ, spec, opts...)
if err != nil {
return nil, err
return clients, err // previous entries in clients determine which plugins were successfully created
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed so that we can check which client has failed by looking at len(clients)

}
clients[i] = client
clients = append(clients, client)
}
return clients, nil
}
Expand Down
2 changes: 1 addition & 1 deletion managedplugin/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRegistryJsonMarshalUnmarshal(t *testing.T) {
}
}

func TestRegistryYamlMarshalUnmarsahl(t *testing.T) {
func TestRegistryYamlMarshalUnmarshal(t *testing.T) {
b, err := yaml.Marshal(RegistryGrpc)
if err != nil {
t.Fatal("failed to marshal registry:", err)
Expand Down
13 changes: 10 additions & 3 deletions specs/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ type Destination struct {
BatchSizeBytes int `json:"batch_size_bytes,omitempty"`
Spec any `json:"spec,omitempty"`
PKMode PKMode `json:"pk_mode,omitempty"`

// registryInferred is a flag that indicates whether the registry was inferred from an empty value
registryInferred bool
}

func (d *Destination) SetDefaults(defaultBatchSize, defaultBatchSizeBytes int) {
if d.Registry.String() == "" {
d.Registry = RegistryGithub
if d.registryInferred && d.Registry == 0 {
d.Registry = RegistryCloudQuery
}
if d.BatchSize == 0 {
d.BatchSize = defaultBatchSize
Expand Down Expand Up @@ -59,7 +62,7 @@ func (d *Destination) Validate() error {
return fmt.Errorf(msg)
}

if d.Registry == RegistryGithub {
if d.Registry.NeedVersion() {
if d.Version == "" {
return fmt.Errorf("version is required")
}
Expand All @@ -86,3 +89,7 @@ func (d Destination) VersionString() string {
}
return fmt.Sprintf("%s (%s@%s)", d.Name, pathParts[1], d.Version)
}

func (d Destination) RegistryInferred() bool {
return d.registryInferred
}
30 changes: 23 additions & 7 deletions specs/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package specs
import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
)

type testDestinationSpec struct {
Expand Down Expand Up @@ -77,9 +77,7 @@ func TestDestinationUnmarshalSpec(t *testing.T) {
}

source := spec.Spec.(*Source)
if cmp.Diff(source, tc.source) != "" {
t.Fatalf("expected:%v got:%v", tc.source, source)
}
require.Equal(t, tc.source, source)
})
}
}
Expand Down Expand Up @@ -159,6 +157,26 @@ spec:
name: test
path: cloudquery/test
version: v1.1.0
`,
"",
&Destination{
Name: "test",
Registry: RegistryCloudQuery,
Path: "cloudquery/test",
Version: "v1.1.0",
BatchSize: 10000,
BatchSizeBytes: 10000000,
registryInferred: true,
},
},
{
"success github",
`kind: destination
spec:
name: test
registry: github
path: cloudquery/test
version: v1.1.0
`,
"",
&Destination{
Expand Down Expand Up @@ -191,9 +209,7 @@ func TestDestinationUnmarshalSpecValidate(t *testing.T) {
return
}

if cmp.Diff(destination, tc.destination) != "" {
t.Fatalf("expected:\n%v\ngot:\n%v\n", tc.destination, destination)
}
require.Equal(t, tc.destination, destination)
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions specs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (r *Registry) UnmarshalJSON(data []byte) (err error) {
return nil
}

func (r Registry) NeedVersion() bool {
return r == RegistryGithub || r == RegistryCloudQuery
}

func RegistryFromString(s string) (Registry, error) {
switch s {
case "github":
Expand Down
2 changes: 1 addition & 1 deletion specs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRegistryJsonMarshalUnmarshal(t *testing.T) {
}
}

func TestRegistryYamlMarshalUnmarsahl(t *testing.T) {
func TestRegistryYamlMarshalUnmarshal(t *testing.T) {
b, err := yaml.Marshal(RegistryGrpc)
if err != nil {
t.Fatal("failed to marshal registry:", err)
Expand Down
15 changes: 11 additions & 4 deletions specs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Source struct {
// For the local registry the path will be the path to the binary: ./path/to/binary
// For the gRPC registry the path will be the address of the gRPC server: host:port
Path string `json:"path,omitempty"`
// Registry can be github,local,grpc.
// Registry can be github,local,grpc,cloudquery
Registry Registry `json:"registry,omitempty"`
Concurrency uint64 `json:"concurrency,omitempty"`
TableConcurrency uint64 `json:"table_concurrency,omitempty"` // deprecated: use Concurrency instead
Expand All @@ -52,11 +52,14 @@ type Source struct {
// DeterministicCQID is a flag that indicates whether the source plugin should generate a random UUID as the value of _cq_id
// or whether it should calculate a UUID that is a hash of the primary keys (if they exist) or the entire resource.
DeterministicCQID bool `json:"deterministic_cq_id,omitempty"`

// registryInferred is a flag that indicates whether the registry was inferred from an empty value
registryInferred bool
}

func (s *Source) SetDefaults() {
if s.Registry.String() == "" {
s.Registry = RegistryGithub
if s.registryInferred && s.Registry == 0 {
s.Registry = RegistryCloudQuery
}
if s.Backend.String() == "" {
s.Backend = BackendNone
Expand Down Expand Up @@ -117,7 +120,7 @@ func (s *Source) Validate() error {
return fmt.Errorf("tables configuration is required. Hint: set the tables you want to sync by adding `tables: [...]` or use `cloudquery tables` to list available tables")
}

if s.Registry == RegistryGithub {
if s.Registry.NeedVersion() {
if s.Version == "" {
return fmt.Errorf("version is required")
}
Expand Down Expand Up @@ -147,3 +150,7 @@ func (s Source) VersionString() string {
}
return fmt.Sprintf("%s (%s@%s)", s.Name, pathParts[1], s.Version)
}

func (s Source) RegistryInferred() bool {
return s.registryInferred
}
51 changes: 36 additions & 15 deletions specs/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package specs
import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
)

var sourceUnmarshalSpecTestCases = []struct {
Expand Down Expand Up @@ -58,9 +58,7 @@ func TestSourceUnmarshalSpec(t *testing.T) {
}

source := spec.Spec.(*Source)
if cmp.Diff(source, tc.source) != "" {
t.Fatalf("expected:%v got:%v", tc.source, source)
}
require.Equal(t, tc.source, source)
})
}
}
Expand Down Expand Up @@ -146,14 +144,15 @@ spec:
`,
"",
&Source{
Name: "test",
Registry: RegistryGithub,
Path: "cloudquery/test",
Concurrency: defaultConcurrency,
Version: "v1.1.0",
Destinations: []string{"test"},
Scheduler: SchedulerRoundRobin,
Tables: []string{"test"},
Name: "test",
Registry: RegistryCloudQuery,
Path: "cloudquery/test",
Concurrency: defaultConcurrency,
Version: "v1.1.0",
Destinations: []string{"test"},
Scheduler: SchedulerRoundRobin,
Tables: []string{"test"},
registryInferred: true,
},
},
{
Expand All @@ -165,6 +164,30 @@ spec:
version: v1.1.0
destinations: ["test"]
tables: ["test"]
`,
"",
&Source{
Name: "test",
Registry: RegistryCloudQuery,
Path: "cloudquery/test",
Concurrency: defaultConcurrency,
Version: "v1.1.0",
Destinations: []string{"test"},
Scheduler: SchedulerDFS,
Tables: []string{"test"},
registryInferred: true,
},
},
{
"success github",
`kind: source
spec:
name: test
path: cloudquery/test
registry: github
version: v1.1.0
destinations: ["test"]
tables: ["test"]
`,
"",
&Source{
Expand Down Expand Up @@ -199,9 +222,7 @@ func TestSourceUnmarshalSpecValidate(t *testing.T) {
return
}

if cmp.Diff(source, tc.source) != "" {
t.Fatalf("expected:%v got:%v", tc.source, source)
}
require.Equal(t, tc.source, source)
})
}
}
Expand Down
17 changes: 16 additions & 1 deletion specs/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,22 @@ func (s *Spec) UnmarshalJSON(data []byte) error {
dec = json.NewDecoder(bytes.NewReader(b))
dec.UseNumber()
dec.DisallowUnknownFields()
return dec.Decode(s.Spec)
if err = dec.Decode(s.Spec); err != nil {
return err
}

var r struct {
Registry string `json:"registry"`
}
if err := json.Unmarshal(b, &r); err == nil && r.Registry == "" {
switch s.Kind {
case KindSource:
s.Spec.(*Source).registryInferred = true
case KindDestination:
s.Spec.(*Destination).registryInferred = true
}
}
return nil
}

func UnmarshalJSONStrict(b []byte, out any) error {
Expand Down