Skip to content

Commit

Permalink
Implement working fix for docker-based syncs.
Browse files Browse the repository at this point in the history
  • Loading branch information
marianogappa committed Aug 8, 2024
1 parent d0415a5 commit 451c847
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 10 deletions.
6 changes: 6 additions & 0 deletions managedplugin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ func WithCloudQueryDockerHost(dockerHost string) Option {
c.cqDockerHost = dockerHost
}
}

func WithUseTCP() Option {
return func(c *Client) {
c.useTCP = true
}
}
74 changes: 64 additions & 10 deletions managedplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
pbDiscoveryV1 "github.com/cloudquery/plugin-pb-go/pb/discovery/v1"
pbSource "github.com/cloudquery/plugin-pb-go/pb/source/v0"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
dockerClient "github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
Expand Down Expand Up @@ -102,6 +101,8 @@ type Client struct {
teamName string
licenseFile string
dockerAuth string
useTCP bool
tcpAddr string
}

// typ will be deprecated soon but now required for a transition period
Expand Down Expand Up @@ -263,6 +264,9 @@ func (c *Client) ConnectionString() string {
case RegistryLocal,
RegistryGithub,
RegistryCloudQuery:
if c.useTCP {
return tgt
}
return "unix://" + tgt
case RegistryDocker:
return tgt
Expand Down Expand Up @@ -295,6 +299,7 @@ func (c *Client) startDockerPlugin(ctx context.Context, configPath string) error
Env: c.config.Environment,
}
hostConfig := &container.HostConfig{
ExtraHosts: []string{"host.docker.internal:host-gateway"},
PortBindings: map[nat.Port][]nat.PortBinding{
"7777/tcp": {
{
Expand All @@ -303,13 +308,6 @@ func (c *Client) startDockerPlugin(ctx context.Context, configPath string) error
},
},
},
Mounts: []mount.Mount{
{
Type: mount.TypeBind,
Source: unixSocketDir,
Target: unixSocketDir,
},
},
}

networkingConfig := &network.NetworkingConfig{}
Expand Down Expand Up @@ -408,7 +406,60 @@ func waitForContainerRunning(ctx context.Context, cli *dockerClient.Client, cont
return err
}

func getFreeTCPAddr() (string, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return "", err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return "", err
}
defer l.Close()

return l.Addr().String(), nil
}

func (c *Client) startLocal(ctx context.Context, path string) error {
if c.useTCP {
tcpAddr, err := getFreeTCPAddr()
if err != nil {
return fmt.Errorf("failed to get free port: %w", err)
}
c.tcpAddr = tcpAddr
return c.startLocalTCP(ctx, path)
}
return c.startLocalUnixSocket(ctx, path)
}

func (c *Client) startLocalTCP(ctx context.Context, path string) error {
// spawn the plugin first and then connect
args := c.getPluginArgs()
cmd := exec.CommandContext(ctx, path, args...)
reader, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to get stdout pipe: %w", err)
}
cmd.Stderr = os.Stderr
if c.config.Environment != nil {
cmd.Env = c.config.Environment
}
cmd.SysProcAttr = getSysProcAttr()
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start plugin %s: %w", path, err)
}

c.cmd = cmd

c.logReader = reader
c.wg.Add(1)
go c.readLogLines(reader)

return c.connectUsingTCP(ctx, c.tcpAddr)
}

func (c *Client) startLocalUnixSocket(ctx context.Context, path string) error {
c.grpcSocketName = GenerateRandomUnixSocketName()
// spawn the plugin first and then connect
args := c.getPluginArgs()
Expand Down Expand Up @@ -450,9 +501,12 @@ func (c *Client) startLocal(ctx context.Context, path string) error {

func (c *Client) getPluginArgs() []string {
args := []string{"serve", "--log-level", c.logger.GetLevel().String(), "--log-format", "json"}
if c.grpcSocketName != "" {
switch {
case c.grpcSocketName != "":
args = append(args, "--network", "unix", "--address", c.grpcSocketName)
} else {
case c.useTCP:
args = append(args, "--network", "tcp", "--address", c.tcpAddr)
default:
args = append(args, "--network", "tcp", "--address", "0.0.0.0:7777")
}
if c.noSentry {
Expand Down

0 comments on commit 451c847

Please sign in to comment.