Skip to content

Commit

Permalink
feat: Expose if plugin was downloaded or cached (#341)
Browse files Browse the repository at this point in the history
* feat: Expose if plugin was downloaded or cached

* Implement my own review comments.

* Implement review comments.

* Fix some outdated naming.

---------

Co-authored-by: Mariano Gappa <spinetta@gmail.com>
  • Loading branch information
erezrokah and marianogappa authored Jun 14, 2024
1 parent a8a3850 commit 1064243
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 31 deletions.
22 changes: 14 additions & 8 deletions managedplugin/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,15 @@ type DownloaderOptions struct {
NoProgress bool
}

func DownloadPluginFromHub(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions, dops DownloaderOptions) error {
downloadDir := filepath.Dir(ops.LocalPath)
func DownloadPluginFromHub(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions, dops DownloaderOptions) (AssetSource, error) {
if _, err := os.Stat(ops.LocalPath); err == nil {
return nil
return AssetSourceCached, nil
}
return AssetSourceRemote, doDownloadPluginFromHub(ctx, c, ops, dops)
}

func doDownloadPluginFromHub(ctx context.Context, c *cloudquery_api.ClientWithResponses, ops HubDownloadOptions, dops DownloaderOptions) error {
downloadDir := filepath.Dir(ops.LocalPath)
if err := os.MkdirAll(downloadDir, 0755); err != nil {
return fmt.Errorf("failed to create plugin directory %s: %w", downloadDir, err)
}
Expand Down Expand Up @@ -239,13 +242,16 @@ func downloadPluginAssetFromHub(ctx context.Context, c *cloudquery_api.ClientWit
}
}

func DownloadPluginFromGithub(ctx context.Context, logger zerolog.Logger, localPath string, org string, name string, version string, typ PluginType, dops DownloaderOptions) error {
downloadDir := filepath.Dir(localPath)
pluginZipPath := localPath + ".zip"

func DownloadPluginFromGithub(ctx context.Context, logger zerolog.Logger, localPath string, org string, name string, version string, typ PluginType, dops DownloaderOptions) (AssetSource, error) {
if _, err := os.Stat(localPath); err == nil {
return nil
return AssetSourceCached, nil
}
return AssetSourceRemote, doDownloadPluginFromGithub(ctx, logger, localPath, org, name, version, typ, dops)
}

func doDownloadPluginFromGithub(ctx context.Context, logger zerolog.Logger, localPath string, org string, name string, version string, typ PluginType, dops DownloaderOptions) error {
downloadDir := filepath.Dir(localPath)
pluginZipPath := localPath + ".zip"

if err := os.MkdirAll(downloadDir, 0755); err != nil {
return fmt.Errorf("failed to create plugin directory %s: %w", downloadDir, err)
Expand Down
10 changes: 8 additions & 2 deletions managedplugin/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ func TestDownloadPluginFromGithubIntegration(t *testing.T) {
logger := zerolog.Logger{}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := DownloadPluginFromGithub(context.Background(), logger, path.Join(tmp, tc.name), tc.org, tc.plugin, tc.version, tc.typ, DownloaderOptions{})
assetSource, err := DownloadPluginFromGithub(context.Background(), logger, path.Join(tmp, tc.name), tc.org, tc.plugin, tc.version, tc.typ, DownloaderOptions{})
if assetSource != AssetSourceRemote {
t.Errorf("DownloadPluginFromGithub() got = %v, want %v", assetSource, AssetSourceRemote)
}
if (err != nil) != tc.wantErr {
t.Errorf("DownloadPluginFromGithub() error = %v, wantErr %v", err, tc.wantErr)
return
Expand All @@ -56,7 +59,7 @@ func TestDownloadPluginFromCloudQueryHub(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.testName, func(t *testing.T) {
err := DownloadPluginFromHub(context.Background(), c, HubDownloadOptions{
assetSource, err := DownloadPluginFromHub(context.Background(), c, HubDownloadOptions{
LocalPath: path.Join(tmp, tc.testName),
AuthToken: "",
TeamName: "",
Expand All @@ -67,6 +70,9 @@ func TestDownloadPluginFromCloudQueryHub(t *testing.T) {
},
DownloaderOptions{},
)
if assetSource != AssetSourceRemote {
t.Errorf("TestDownloadPluginFromCloudQueryIntegration() got = %v, want %v", assetSource, AssetSourceRemote)
}
if (err != nil) != tc.wantErr {
t.Errorf("TestDownloadPluginFromCloudQueryIntegration() error = %v, wantErr %v", err, tc.wantErr)
return
Expand Down
49 changes: 46 additions & 3 deletions managedplugin/metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,53 @@
package managedplugin

import "sync/atomic"
import (
"encoding/json"
"fmt"
"sync/atomic"
)

type AssetSource int

const (
AssetSourceUnknown AssetSource = iota
AssetSourceCached
AssetSourceRemote
)

func (r AssetSource) String() string {
return [...]string{"unknown", "cached", "remote"}[r]
}

func (r AssetSource) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, r.String())), nil
}

func (r *AssetSource) UnmarshalJSON(data []byte) (err error) {
var mode string
if err := json.Unmarshal(data, &mode); err != nil {
return err
}
if *r, err = AssetSourceFromString(mode); err != nil {
return err
}
return nil
}

func AssetSourceFromString(s string) (AssetSource, error) {
switch s {
case "cached":
return AssetSourceCached, nil
case "remote":
return AssetSourceRemote, nil
default:
return AssetSourceUnknown, fmt.Errorf("unknown mode %s", s)
}
}

type Metrics struct {
Errors uint64
Warnings uint64
Errors uint64
Warnings uint64
AssetSource AssetSource
}

func (m *Metrics) incrementErrors() {
Expand Down
42 changes: 24 additions & 18 deletions managedplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ func NewClient(ctx context.Context, typ PluginType, config Config, opts ...Optio
for _, opt := range opts {
opt(c)
}
if err := c.downloadPlugin(ctx, typ); err != nil {
assetSource, err := c.downloadPlugin(ctx, typ)
if err != nil {
return nil, err
}
if assetSource != AssetSourceUnknown {
c.metrics.AssetSource = assetSource
}
if !c.noExec {
if err := c.execPlugin(ctx); err != nil {
return nil, err
Expand All @@ -163,35 +167,36 @@ func NewClient(ctx context.Context, typ PluginType, config Config, opts ...Optio
return c, nil
}

func (c *Client) downloadPlugin(ctx context.Context, typ PluginType) error {
func (c *Client) downloadPlugin(ctx context.Context, typ PluginType) (AssetSource, error) {
dops := DownloaderOptions{
NoProgress: c.noProgress,
}
switch c.config.Registry {
case RegistryGrpc:
return nil // GRPC plugins are not downloaded
return AssetSourceUnknown, nil // GRPC plugins are not downloaded
case RegistryLocal:
return validateLocalExecPath(c.config.Path)
return AssetSourceUnknown, validateLocalExecPath(c.config.Path)
case RegistryGithub:
pathSplit := strings.Split(c.config.Path, "/")
if len(pathSplit) != 2 {
return fmt.Errorf("invalid github plugin path: %s. format should be owner/repo", c.config.Path)
return AssetSourceUnknown, fmt.Errorf("invalid github plugin path: %s. format should be owner/repo", c.config.Path)
}
org, name := pathSplit[0], pathSplit[1]
c.LocalPath = filepath.Join(c.directory, "plugins", typ.String(), org, name, c.config.Version, "plugin")
c.LocalPath = WithBinarySuffix(c.LocalPath)
return DownloadPluginFromGithub(ctx, c.logger, c.LocalPath, org, name, c.config.Version, typ, dops)
assetSource, err := DownloadPluginFromGithub(ctx, c.logger, c.LocalPath, org, name, c.config.Version, typ, dops)
return assetSource, err
case RegistryDocker:
if imageAvailable, err := isDockerImageAvailable(ctx, c.config.Path); err != nil {
return err
return AssetSourceUnknown, err
} else if !imageAvailable {
return pullDockerImage(ctx, c.config.Path, c.authToken, c.teamName, c.dockerAuth, dops)
return AssetSourceRemote, pullDockerImage(ctx, c.config.Path, c.authToken, c.teamName, c.dockerAuth, dops)
}
return nil
return AssetSourceCached, nil
case RegistryCloudQuery:
pathSplit := strings.Split(c.config.Path, "/")
if len(pathSplit) != 2 {
return fmt.Errorf("invalid cloudquery plugin path: %s. format should be team/name", c.config.Path)
return AssetSourceUnknown, fmt.Errorf("invalid cloudquery plugin path: %s. format should be team/name", c.config.Path)
}
org, name := pathSplit[0], pathSplit[1]
c.LocalPath = filepath.Join(c.directory, "plugins", typ.String(), org, name, c.config.Version, "plugin")
Expand All @@ -208,26 +213,26 @@ func (c *Client) downloadPlugin(ctx context.Context, typ PluginType) error {
}
hubClient, err := getHubClient(c.logger, ops)
if err != nil {
return err
return AssetSourceUnknown, err
}
isDocker, err := isDockerPlugin(ctx, hubClient, ops)
if err != nil {
return err
return AssetSourceUnknown, err
}
if isDocker {
path := fmt.Sprintf(c.cqDockerHost+"/%s/%s-%s:%s", ops.PluginTeam, ops.PluginKind, ops.PluginName, ops.PluginVersion)
c.config.Registry = RegistryDocker // will be used by exec step
c.config.Path = path
if imageAvailable, err := isDockerImageAvailable(ctx, path); err != nil {
return err
return AssetSourceUnknown, err
} else if !imageAvailable {
return pullDockerImage(ctx, path, c.authToken, c.teamName, "", dops)
return AssetSourceRemote, pullDockerImage(ctx, path, c.authToken, c.teamName, "", dops)
}
return nil
return AssetSourceCached, nil
}
return DownloadPluginFromHub(ctx, hubClient, ops, dops)
default:
return fmt.Errorf("unknown registry %s", c.config.Registry.String())
return AssetSourceUnknown, fmt.Errorf("unknown registry %s", c.config.Registry.String())
}
}

Expand Down Expand Up @@ -265,8 +270,9 @@ func (c *Client) ConnectionString() string {

func (c *Client) Metrics() Metrics {
return Metrics{
Errors: atomic.LoadUint64(&c.metrics.Errors),
Warnings: atomic.LoadUint64(&c.metrics.Warnings),
Errors: atomic.LoadUint64(&c.metrics.Errors),
Warnings: atomic.LoadUint64(&c.metrics.Warnings),
AssetSource: c.metrics.AssetSource,
}
}

Expand Down

0 comments on commit 1064243

Please sign in to comment.