From 58ec0771f976cb64470b3d1dbab058ffebec45d8 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 5 Feb 2024 09:17:50 +0000 Subject: [PATCH] feat: Allow use of CloudQuery registry with docker plugins (#235) This change allows Docker plugins to use the CloudQuery registry, meaning that configs can now be written like: ``` path: cloudquery/typeform version: v1.2.3 ``` instead of ``` registry: docker path: docker.cloudquery.io/cloudquery/source-typeform:v1.2.3 ``` The `docker` registry can still be used if preferred, either with or without the `docker.cloudquery.io` path. --- .github/workflows/unittest.yml | 3 ++ managedplugin/download.go | 21 ++++--------- managedplugin/download_test.go | 12 +++++-- managedplugin/hub.go | 39 +++++++++++++++++++++++ managedplugin/options.go | 6 ++++ managedplugin/plugin.go | 41 +++++++++++++++++++----- managedplugin/plugin_test.go | 57 +++++++++++++++++++++++++++++++++- 7 files changed, 153 insertions(+), 26 deletions(-) create mode 100644 managedplugin/hub.go diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index 4a51cad..77a6da5 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -26,3 +26,6 @@ jobs: - run: go build ./... - name: Run tests run: make test + env: + CLOUDQUERY_TEAM_NAME: ${{ secrets.CLOUDQUERY_TEAM_NAME }} + CLOUDQUERY_API_KEY: ${{ secrets.CLOUDQUERY_API_KEY }} \ No newline at end of file diff --git a/managedplugin/download.go b/managedplugin/download.go index 7fde1f0..8e853b3 100644 --- a/managedplugin/download.go +++ b/managedplugin/download.go @@ -17,6 +17,7 @@ import ( "github.com/avast/retry-go/v4" cloudquery_api "github.com/cloudquery/cloudquery-api-go" + "github.com/rs/zerolog" "github.com/schollz/progressbar/v3" ) @@ -127,7 +128,7 @@ type HubDownloadOptions struct { PluginVersion string } -func DownloadPluginFromHub(ctx context.Context, ops HubDownloadOptions) error { +func DownloadPluginFromHub(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions) error { downloadDir := filepath.Dir(ops.LocalPath) if _, err := os.Stat(ops.LocalPath); err == nil { return nil @@ -137,7 +138,7 @@ func DownloadPluginFromHub(ctx context.Context, ops HubDownloadOptions) error { return fmt.Errorf("failed to create plugin directory %s: %w", downloadDir, err) } - pluginAsset, statusCode, err := downloadPluginAssetFromHub(ctx, ops) + pluginAsset, statusCode, err := downloadPluginAssetFromHub(ctx, c, ops) if err != nil { return fmt.Errorf("failed to get plugin metadata from hub: %w", err) } @@ -198,18 +199,7 @@ func DownloadPluginFromHub(ctx context.Context, ops HubDownloadOptions) error { return nil } -func downloadPluginAssetFromHub(ctx context.Context, ops HubDownloadOptions) (*cloudquery_api.PluginAsset, int, error) { - c, err := cloudquery_api.NewClientWithResponses(APIBaseURL(), - cloudquery_api.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error { - if ops.AuthToken != "" { - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", ops.AuthToken)) - } - return nil - })) - if err != nil { - return nil, -1, fmt.Errorf("failed to create Hub API client: %w", err) - } - +func downloadPluginAssetFromHub(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions) (*cloudquery_api.PluginAsset, int, error) { target := fmt.Sprintf("%s_%s", runtime.GOOS, runtime.GOARCH) aj := "application/json" @@ -246,7 +236,7 @@ func downloadPluginAssetFromHub(ctx context.Context, ops HubDownloadOptions) (*c } } -func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, name string, version string, typ PluginType) error { +func DownloadPluginFromGithub(ctx context.Context, logger zerolog.Logger, localPath string, org string, name string, version string, typ PluginType) error { downloadDir := filepath.Dir(localPath) pluginZipPath := localPath + ".zip" @@ -262,6 +252,7 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, if err != nil { return fmt.Errorf("failed to get plugin url: %w", err) } + logger.Debug().Msg(fmt.Sprintf("Downloading %s", downloadURL)) if _, err := downloadFile(ctx, pluginZipPath, downloadURL); err != nil { return fmt.Errorf("failed to download plugin: %w", err) } diff --git a/managedplugin/download_test.go b/managedplugin/download_test.go index 8a416d1..c4c3ecd 100644 --- a/managedplugin/download_test.go +++ b/managedplugin/download_test.go @@ -4,6 +4,9 @@ import ( "context" "path" "testing" + + cloudquery_api "github.com/cloudquery/cloudquery-api-go" + "github.com/rs/zerolog" ) func TestDownloadPluginFromGithubIntegration(t *testing.T) { @@ -23,9 +26,10 @@ func TestDownloadPluginFromGithubIntegration(t *testing.T) { {name: "invalid community source", org: "cloudquery", plugin: "invalid-plugin", version: "v0.0.x", wantErr: true, typ: PluginSource}, {name: "invalid monorepo source", org: "not-cloudquery", plugin: "invalid-plugin", version: "v0.0.x", wantErr: true, typ: PluginSource}, } + logger := zerolog.Logger{} for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - err := DownloadPluginFromGithub(context.Background(), path.Join(tmp, tc.name), tc.org, tc.plugin, tc.version, tc.typ) + err := DownloadPluginFromGithub(context.Background(), logger, path.Join(tmp, tc.name), tc.org, tc.plugin, tc.version, tc.typ) if (err != nil) != tc.wantErr { t.Errorf("DownloadPluginFromGithub() error = %v, wantErr %v", err, tc.wantErr) return @@ -46,9 +50,13 @@ func TestDownloadPluginFromCloudQueryHub(t *testing.T) { }{ {testName: "should download test plugin from cloudquery registry", team: "cloudquery", plugin: "aws", version: "v22.18.0", typ: PluginSource}, } + c, err := cloudquery_api.NewClientWithResponses(APIBaseURL()) + if err != nil { + t.Fatalf("failed to create Hub API client: %v", err) + } for _, tc := range cases { t.Run(tc.testName, func(t *testing.T) { - err := DownloadPluginFromHub(context.Background(), HubDownloadOptions{ + err := DownloadPluginFromHub(context.Background(), c, HubDownloadOptions{ LocalPath: path.Join(tmp, tc.testName), AuthToken: "", TeamName: "", diff --git a/managedplugin/hub.go b/managedplugin/hub.go new file mode 100644 index 0000000..486bf38 --- /dev/null +++ b/managedplugin/hub.go @@ -0,0 +1,39 @@ +package managedplugin + +import ( + "context" + "fmt" + "net/http" + + cloudquery_api "github.com/cloudquery/cloudquery-api-go" + "github.com/rs/zerolog" +) + +func getHubClient(logger zerolog.Logger, ops HubDownloadOptions) (*cloudquery_api.ClientWithResponses, error) { + c, err := cloudquery_api.NewClientWithResponses(APIBaseURL(), + cloudquery_api.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error { + if ops.AuthToken != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", ops.AuthToken)) + } + logger.Debug().Interface("ops", ops).Msg(fmt.Sprintf("Requesting %s %s", req.Method, req.URL)) + return nil + })) + if err != nil { + return nil, fmt.Errorf("failed to create Hub API client: %w", err) + } + return c, nil +} + +func isDockerPlugin(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions) (bool, error) { + p, err := c.GetPluginVersionWithResponse(ctx, ops.PluginTeam, cloudquery_api.PluginKind(ops.PluginKind), ops.PluginName, ops.PluginVersion) + if err != nil { + return false, fmt.Errorf("failed to get plugin information: %w", err) + } + if p.StatusCode() != http.StatusOK { + return false, fmt.Errorf("failed to get plugin information: %s", p.Status()) + } + if p.JSON200 == nil { + return false, fmt.Errorf("failed to get plugin information: response body is empty") + } + return p.JSON200.PackageType == "docker", nil +} diff --git a/managedplugin/options.go b/managedplugin/options.go index 5cd1da1..e2ffceb 100644 --- a/managedplugin/options.go +++ b/managedplugin/options.go @@ -57,3 +57,9 @@ func WithLicenseFile(licenseFile string) Option { c.licenseFile = licenseFile } } + +func WithCloudQueryDockerHost(dockerHost string) Option { + return func(c *Client) { + c.cqDockerHost = dockerHost + } +} diff --git a/managedplugin/plugin.go b/managedplugin/plugin.go index e4f0123..c13c34e 100644 --- a/managedplugin/plugin.go +++ b/managedplugin/plugin.go @@ -48,6 +48,8 @@ const ( containerServerHealthyInitialRetryDelay = 100 * time.Millisecond containerStopTimeoutSeconds = 10 + + DefaultCloudQueryDockerHost = "docker.cloudquery.io" ) // PluginType specifies if a plugin is a source or a destination. @@ -86,6 +88,7 @@ type Client struct { config Config noSentry bool noExec bool + cqDockerHost string otelEndpoint string otelEndpointInsecure bool metrics *Metrics @@ -133,11 +136,12 @@ func (c Clients) Terminate() error { // If registrySpec is Docker then client downloads the docker image, runs it and creates a gRPC connection. func NewClient(ctx context.Context, typ PluginType, config Config, opts ...Option) (*Client, error) { c := &Client{ - directory: defaultDownloadDir, - wg: &sync.WaitGroup{}, - config: config, - metrics: &Metrics{}, - registry: config.Registry, + directory: defaultDownloadDir, + wg: &sync.WaitGroup{}, + config: config, + metrics: &Metrics{}, + registry: config.Registry, + cqDockerHost: DefaultCloudQueryDockerHost, } for _, opt := range opts { opt(c) @@ -168,7 +172,7 @@ func (c *Client) downloadPlugin(ctx context.Context, typ PluginType) error { org, name := pathSplit[0], pathSplit[1] c.LocalPath = filepath.Join(c.directory, "plugins", typ.String(), org, name, c.config.Version, "plugin") c.LocalPath = WithBinarySuffix(c.LocalPath) - return DownloadPluginFromGithub(ctx, c.LocalPath, org, name, c.config.Version, typ) + return DownloadPluginFromGithub(ctx, c.logger, c.LocalPath, org, name, c.config.Version, typ) case RegistryDocker: if imageAvailable, err := isDockerImageAvailable(ctx, c.config.Path); err != nil { return err @@ -184,7 +188,8 @@ func (c *Client) downloadPlugin(ctx context.Context, typ PluginType) error { org, name := pathSplit[0], pathSplit[1] c.LocalPath = filepath.Join(c.directory, "plugins", typ.String(), org, name, c.config.Version, "plugin") c.LocalPath = WithBinarySuffix(c.LocalPath) - return DownloadPluginFromHub(ctx, HubDownloadOptions{ + + ops := HubDownloadOptions{ AuthToken: c.authToken, TeamName: c.teamName, LocalPath: c.LocalPath, @@ -192,7 +197,27 @@ func (c *Client) downloadPlugin(ctx context.Context, typ PluginType) error { PluginKind: typ.String(), PluginName: name, PluginVersion: c.config.Version, - }) + } + hubClient, err := getHubClient(c.logger, ops) + if err != nil { + return err + } + isDocker, err := isDockerPlugin(ctx, hubClient, ops) + if err != nil { + return err + } + if isDocker { + path := fmt.Sprintf(c.cqDockerHost+"/%s/%s-%s:%s", ops.PluginTeam, ops.PluginKind, ops.PluginName, ops.PluginVersion) + c.config.Registry = RegistryDocker // will be used by exec step + c.config.Path = path + if imageAvailable, err := isDockerImageAvailable(ctx, path); err != nil { + return err + } else if !imageAvailable { + return pullDockerImage(ctx, path, c.authToken, c.teamName) + } + return nil + } + return DownloadPluginFromHub(ctx, hubClient, ops) default: return fmt.Errorf("unknown registry %s", c.config.Registry.String()) } diff --git a/managedplugin/plugin_test.go b/managedplugin/plugin_test.go index ba13fa4..e061a91 100644 --- a/managedplugin/plugin_test.go +++ b/managedplugin/plugin_test.go @@ -19,7 +19,8 @@ func TestManagedPluginGitHub(t *testing.T) { Path: "cloudquery/hackernews", Version: "v1.1.4", } - clients, err := NewClients(ctx, PluginSource, []Config{cfg}, WithDirectory(tmpDir), WithNoSentry()) + clients, err := NewClients(ctx, PluginSource, []Config{cfg}, + WithDirectory(tmpDir), WithNoSentry()) if err != nil { t.Fatal(err) } @@ -94,6 +95,60 @@ func TestManagedPluginCloudQuery(t *testing.T) { } } +func TestManagedPluginCloudQueryDocker(t *testing.T) { + ctx := context.Background() + cli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + t.Fatal(err) + } + _, err = cli.Ping(ctx) + if err != nil { + t.Skip("docker not running") + } + // note: this test requires an API key and team name to be set in the environment + if os.Getenv("CLOUDQUERY_TEAM_NAME") == "" { + t.Skip("CLOUDQUERY_TEAM_NAME not set") + } + if os.Getenv("CLOUDQUERY_API_KEY") == "" { + t.Skip("CLOUDQUERY_API_KEY not set") + } + if runtime.GOOS == "windows" { + // the docker image is not built for Windows, so would require enabling of experimental + // linux compatibility. We skip this test in CI for now. + t.Skip("this test is not supported on windows") + } + + tmpDir := t.TempDir() + cfg := Config{ + Name: "test", + Registry: RegistryCloudQuery, + Version: "v1.2.3", + Path: "cloudquery/typeform", + } + clients, err := NewClients(ctx, PluginSource, []Config{cfg}, + WithDirectory(tmpDir), WithNoSentry(), + WithAuthToken(os.Getenv("CLOUDQUERY_API_KEY")), + WithTeamName(os.Getenv("CLOUDQUERY_TEAM_NAME")), + ) + if err != nil { + t.Fatal(err) + } + testClient := clients.ClientByName("test") + if testClient == nil { + t.Fatal("test client not found") + } + v, err := testClient.Versions(ctx) + if err != nil { + t.Fatal(err) + } + if len(v) < 1 { + t.Fatal("expected at least 1 version, got 0") + } + if err := clients.Terminate(); err != nil { + t.Fatal(err) + } +} + func TestManagedPluginDocker(t *testing.T) { ctx := context.Background() cli, err := client.NewClientWithOpts(client.FromEnv)