Skip to content

Commit

Permalink
feat: Allow use of CloudQuery registry with docker plugins (#235)
Browse files Browse the repository at this point in the history
This change allows Docker plugins to use the CloudQuery registry, meaning that configs can now be written like:

```
path: cloudquery/typeform
version: v1.2.3
```

instead of

```
registry: docker
path: docker.cloudquery.io/cloudquery/source-typeform:v1.2.3
```

The `docker` registry can still be used if preferred, either with or without the `docker.cloudquery.io` path.
  • Loading branch information
hermanschaaf committed Feb 5, 2024
1 parent 8339927 commit 58ec077
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 26 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ jobs:
- run: go build ./...
- name: Run tests
run: make test
env:
CLOUDQUERY_TEAM_NAME: ${{ secrets.CLOUDQUERY_TEAM_NAME }}
CLOUDQUERY_API_KEY: ${{ secrets.CLOUDQUERY_API_KEY }}
21 changes: 6 additions & 15 deletions managedplugin/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/avast/retry-go/v4"
cloudquery_api "github.com/cloudquery/cloudquery-api-go"
"github.com/rs/zerolog"
"github.com/schollz/progressbar/v3"
)

Expand Down Expand Up @@ -127,7 +128,7 @@ type HubDownloadOptions struct {
PluginVersion string
}

func DownloadPluginFromHub(ctx context.Context, ops HubDownloadOptions) error {
func DownloadPluginFromHub(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions) error {
downloadDir := filepath.Dir(ops.LocalPath)
if _, err := os.Stat(ops.LocalPath); err == nil {
return nil
Expand All @@ -137,7 +138,7 @@ func DownloadPluginFromHub(ctx context.Context, ops HubDownloadOptions) error {
return fmt.Errorf("failed to create plugin directory %s: %w", downloadDir, err)
}

pluginAsset, statusCode, err := downloadPluginAssetFromHub(ctx, ops)
pluginAsset, statusCode, err := downloadPluginAssetFromHub(ctx, c, ops)
if err != nil {
return fmt.Errorf("failed to get plugin metadata from hub: %w", err)
}
Expand Down Expand Up @@ -198,18 +199,7 @@ func DownloadPluginFromHub(ctx context.Context, ops HubDownloadOptions) error {
return nil
}

func downloadPluginAssetFromHub(ctx context.Context, ops HubDownloadOptions) (*cloudquery_api.PluginAsset, int, error) {
c, err := cloudquery_api.NewClientWithResponses(APIBaseURL(),
cloudquery_api.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
if ops.AuthToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", ops.AuthToken))
}
return nil
}))
if err != nil {
return nil, -1, fmt.Errorf("failed to create Hub API client: %w", err)
}

func downloadPluginAssetFromHub(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions) (*cloudquery_api.PluginAsset, int, error) {
target := fmt.Sprintf("%s_%s", runtime.GOOS, runtime.GOARCH)
aj := "application/json"

Expand Down Expand Up @@ -246,7 +236,7 @@ func downloadPluginAssetFromHub(ctx context.Context, ops HubDownloadOptions) (*c
}
}

func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, name string, version string, typ PluginType) error {
func DownloadPluginFromGithub(ctx context.Context, logger zerolog.Logger, localPath string, org string, name string, version string, typ PluginType) error {
downloadDir := filepath.Dir(localPath)
pluginZipPath := localPath + ".zip"

Expand All @@ -262,6 +252,7 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string,
if err != nil {
return fmt.Errorf("failed to get plugin url: %w", err)
}
logger.Debug().Msg(fmt.Sprintf("Downloading %s", downloadURL))
if _, err := downloadFile(ctx, pluginZipPath, downloadURL); err != nil {
return fmt.Errorf("failed to download plugin: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions managedplugin/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"path"
"testing"

cloudquery_api "github.com/cloudquery/cloudquery-api-go"
"github.com/rs/zerolog"
)

func TestDownloadPluginFromGithubIntegration(t *testing.T) {
Expand All @@ -23,9 +26,10 @@ func TestDownloadPluginFromGithubIntegration(t *testing.T) {
{name: "invalid community source", org: "cloudquery", plugin: "invalid-plugin", version: "v0.0.x", wantErr: true, typ: PluginSource},
{name: "invalid monorepo source", org: "not-cloudquery", plugin: "invalid-plugin", version: "v0.0.x", wantErr: true, typ: PluginSource},
}
logger := zerolog.Logger{}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := DownloadPluginFromGithub(context.Background(), path.Join(tmp, tc.name), tc.org, tc.plugin, tc.version, tc.typ)
err := DownloadPluginFromGithub(context.Background(), logger, path.Join(tmp, tc.name), tc.org, tc.plugin, tc.version, tc.typ)
if (err != nil) != tc.wantErr {
t.Errorf("DownloadPluginFromGithub() error = %v, wantErr %v", err, tc.wantErr)
return
Expand All @@ -46,9 +50,13 @@ func TestDownloadPluginFromCloudQueryHub(t *testing.T) {
}{
{testName: "should download test plugin from cloudquery registry", team: "cloudquery", plugin: "aws", version: "v22.18.0", typ: PluginSource},
}
c, err := cloudquery_api.NewClientWithResponses(APIBaseURL())
if err != nil {
t.Fatalf("failed to create Hub API client: %v", err)
}
for _, tc := range cases {
t.Run(tc.testName, func(t *testing.T) {
err := DownloadPluginFromHub(context.Background(), HubDownloadOptions{
err := DownloadPluginFromHub(context.Background(), c, HubDownloadOptions{
LocalPath: path.Join(tmp, tc.testName),
AuthToken: "",
TeamName: "",
Expand Down
39 changes: 39 additions & 0 deletions managedplugin/hub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package managedplugin

import (
"context"
"fmt"
"net/http"

cloudquery_api "github.com/cloudquery/cloudquery-api-go"
"github.com/rs/zerolog"
)

func getHubClient(logger zerolog.Logger, ops HubDownloadOptions) (*cloudquery_api.ClientWithResponses, error) {
c, err := cloudquery_api.NewClientWithResponses(APIBaseURL(),
cloudquery_api.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
if ops.AuthToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", ops.AuthToken))
}
logger.Debug().Interface("ops", ops).Msg(fmt.Sprintf("Requesting %s %s", req.Method, req.URL))
return nil
}))
if err != nil {
return nil, fmt.Errorf("failed to create Hub API client: %w", err)
}
return c, nil
}

func isDockerPlugin(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions) (bool, error) {
p, err := c.GetPluginVersionWithResponse(ctx, ops.PluginTeam, cloudquery_api.PluginKind(ops.PluginKind), ops.PluginName, ops.PluginVersion)
if err != nil {
return false, fmt.Errorf("failed to get plugin information: %w", err)
}
if p.StatusCode() != http.StatusOK {
return false, fmt.Errorf("failed to get plugin information: %s", p.Status())
}
if p.JSON200 == nil {
return false, fmt.Errorf("failed to get plugin information: response body is empty")
}
return p.JSON200.PackageType == "docker", nil
}
6 changes: 6 additions & 0 deletions managedplugin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,9 @@ func WithLicenseFile(licenseFile string) Option {
c.licenseFile = licenseFile
}
}

func WithCloudQueryDockerHost(dockerHost string) Option {
return func(c *Client) {
c.cqDockerHost = dockerHost
}
}
41 changes: 33 additions & 8 deletions managedplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
containerServerHealthyInitialRetryDelay = 100 * time.Millisecond

containerStopTimeoutSeconds = 10

DefaultCloudQueryDockerHost = "docker.cloudquery.io"
)

// PluginType specifies if a plugin is a source or a destination.
Expand Down Expand Up @@ -86,6 +88,7 @@ type Client struct {
config Config
noSentry bool
noExec bool
cqDockerHost string
otelEndpoint string
otelEndpointInsecure bool
metrics *Metrics
Expand Down Expand Up @@ -133,11 +136,12 @@ func (c Clients) Terminate() error {
// If registrySpec is Docker then client downloads the docker image, runs it and creates a gRPC connection.
func NewClient(ctx context.Context, typ PluginType, config Config, opts ...Option) (*Client, error) {
c := &Client{
directory: defaultDownloadDir,
wg: &sync.WaitGroup{},
config: config,
metrics: &Metrics{},
registry: config.Registry,
directory: defaultDownloadDir,
wg: &sync.WaitGroup{},
config: config,
metrics: &Metrics{},
registry: config.Registry,
cqDockerHost: DefaultCloudQueryDockerHost,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -168,7 +172,7 @@ func (c *Client) downloadPlugin(ctx context.Context, typ PluginType) error {
org, name := pathSplit[0], pathSplit[1]
c.LocalPath = filepath.Join(c.directory, "plugins", typ.String(), org, name, c.config.Version, "plugin")
c.LocalPath = WithBinarySuffix(c.LocalPath)
return DownloadPluginFromGithub(ctx, c.LocalPath, org, name, c.config.Version, typ)
return DownloadPluginFromGithub(ctx, c.logger, c.LocalPath, org, name, c.config.Version, typ)
case RegistryDocker:
if imageAvailable, err := isDockerImageAvailable(ctx, c.config.Path); err != nil {
return err
Expand All @@ -184,15 +188,36 @@ func (c *Client) downloadPlugin(ctx context.Context, typ PluginType) error {
org, name := pathSplit[0], pathSplit[1]
c.LocalPath = filepath.Join(c.directory, "plugins", typ.String(), org, name, c.config.Version, "plugin")
c.LocalPath = WithBinarySuffix(c.LocalPath)
return DownloadPluginFromHub(ctx, HubDownloadOptions{

ops := HubDownloadOptions{
AuthToken: c.authToken,
TeamName: c.teamName,
LocalPath: c.LocalPath,
PluginTeam: org,
PluginKind: typ.String(),
PluginName: name,
PluginVersion: c.config.Version,
})
}
hubClient, err := getHubClient(c.logger, ops)
if err != nil {
return err
}
isDocker, err := isDockerPlugin(ctx, hubClient, ops)
if err != nil {
return err
}
if isDocker {
path := fmt.Sprintf(c.cqDockerHost+"/%s/%s-%s:%s", ops.PluginTeam, ops.PluginKind, ops.PluginName, ops.PluginVersion)
c.config.Registry = RegistryDocker // will be used by exec step
c.config.Path = path
if imageAvailable, err := isDockerImageAvailable(ctx, path); err != nil {
return err
} else if !imageAvailable {
return pullDockerImage(ctx, path, c.authToken, c.teamName)
}
return nil
}
return DownloadPluginFromHub(ctx, hubClient, ops)
default:
return fmt.Errorf("unknown registry %s", c.config.Registry.String())
}
Expand Down
57 changes: 56 additions & 1 deletion managedplugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestManagedPluginGitHub(t *testing.T) {
Path: "cloudquery/hackernews",
Version: "v1.1.4",
}
clients, err := NewClients(ctx, PluginSource, []Config{cfg}, WithDirectory(tmpDir), WithNoSentry())
clients, err := NewClients(ctx, PluginSource, []Config{cfg},
WithDirectory(tmpDir), WithNoSentry())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -94,6 +95,60 @@ func TestManagedPluginCloudQuery(t *testing.T) {
}
}

func TestManagedPluginCloudQueryDocker(t *testing.T) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
t.Fatal(err)
}
_, err = cli.Ping(ctx)
if err != nil {
t.Skip("docker not running")
}
// note: this test requires an API key and team name to be set in the environment
if os.Getenv("CLOUDQUERY_TEAM_NAME") == "" {
t.Skip("CLOUDQUERY_TEAM_NAME not set")
}
if os.Getenv("CLOUDQUERY_API_KEY") == "" {
t.Skip("CLOUDQUERY_API_KEY not set")
}
if runtime.GOOS == "windows" {
// the docker image is not built for Windows, so would require enabling of experimental
// linux compatibility. We skip this test in CI for now.
t.Skip("this test is not supported on windows")
}

tmpDir := t.TempDir()
cfg := Config{
Name: "test",
Registry: RegistryCloudQuery,
Version: "v1.2.3",
Path: "cloudquery/typeform",
}
clients, err := NewClients(ctx, PluginSource, []Config{cfg},
WithDirectory(tmpDir), WithNoSentry(),
WithAuthToken(os.Getenv("CLOUDQUERY_API_KEY")),
WithTeamName(os.Getenv("CLOUDQUERY_TEAM_NAME")),
)
if err != nil {
t.Fatal(err)
}
testClient := clients.ClientByName("test")
if testClient == nil {
t.Fatal("test client not found")
}
v, err := testClient.Versions(ctx)
if err != nil {
t.Fatal(err)
}
if len(v) < 1 {
t.Fatal("expected at least 1 version, got 0")
}
if err := clients.Terminate(); err != nil {
t.Fatal(err)
}
}

func TestManagedPluginDocker(t *testing.T) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv)
Expand Down

0 comments on commit 58ec077

Please sign in to comment.