Skip to content

Commit

Permalink
[#830] executor/docker: support network configuration (#835)
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta authored Feb 13, 2025
1 parent bbff142 commit 2810502
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 32 deletions.
12 changes: 10 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,23 @@
]
},
{
"name": "Hello World",
"name": "Start",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/",
"args": [
"start",
"${workspaceFolder}/examples/hello_world.yaml"
"${input:pathToSpec}"
]
}
],
"inputs": [
{
"id": "pathToSpec",
"type": "promptString",
"description": "Enter the path to the spec file",
"default": "${workspaceFolder}/examples/hello_world.yaml"
}
]
}
20 changes: 20 additions & 0 deletions docs/source/executors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,30 @@ For example:
autoRemove: true
command: echo "${FOO}"
The container's network can be configured as well.

.. code-block:: yaml
steps:
- name: hello
executor:
type: docker
config:
image: alpine
pull: false
network:
EndpointsConfig:
my-network:
Aliases:
- my-alias
autoRemove: true
command: echo "hello"
See the Docker's API documentation for all available options.

- For `container`, see `ContainerConfig <https://pkg.go.dev/github.com/docker/docker/api/types/container#Config>`_.
- For `host`, see `HostConfig <https://pkg.go.dev/github.com/docker/docker/api/types/container#HostConfig>`_.
- For `network`, see `NetworkingConfig <https://pkg.go.dev/github.com/docker/docker/api/types/network#NetworkingConfig>`_.

Execute Commands in Existing Containers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
91 changes: 61 additions & 30 deletions internal/digraph/executor/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"fmt"
"io"
"os"
"sync"

"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/logger"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/go-viper/mapstructure/v2"
Expand Down Expand Up @@ -60,9 +62,12 @@ type docker struct {
// hostConfig is configuration for the container host
// See https://pkg.go.dev/github.com/docker/docker/api/types/container#HostConfig
hostConfig *container.HostConfig
// networkConfig is configuration for the container network
// See https://pkg.go.dev/github.com/docker/docker@v27.5.1+incompatible/api/types/network#NetworkingConfig
networkConfig *network.NetworkingConfig
// execConfig is configuration for exec in existing container
// See https://pkg.go.dev/github.com/docker/docker/api/types/container#ExecOptions
execConfig container.ExecOptions
execConfig *container.ExecOptions
}

func (e *docker) SetStdout(out io.Writer) {
Expand Down Expand Up @@ -93,9 +98,20 @@ func (e *docker) Run(ctx context.Context) error {
}
defer cli.Close()

// Evaluate args
stepContext := digraph.GetStepContext(ctx)
var args []string
for _, arg := range e.step.Args {
val, err := stepContext.EvalString(arg)
if err != nil {
return fmt.Errorf("failed to evaluate arg %s: %w", arg, err)
}
args = append(args, val)
}

// If containerName is set, use exec instead of creating a new container
if e.containerName != "" {
return e.execInContainer(ctx, cli)
return e.execInContainer(ctx, cli, args)
}

// New container creation logic
Expand All @@ -110,43 +126,38 @@ func (e *docker) Run(ctx context.Context) error {
}
}

containerConfig := *e.containerConfig
containerConfig.Cmd = append([]string{e.step.Command}, args...)
if e.image != "" {
e.containerConfig.Image = e.image
containerConfig.Image = e.image
}

// Evaluate args
stepContext := digraph.GetStepContext(ctx)
var args []string
for _, arg := range e.step.Args {
val, err := stepContext.EvalString(arg)
env := make([]string, len(containerConfig.Env))
for i, e := range containerConfig.Env {
env[i], err = stepContext.EvalString(e)
if err != nil {
return fmt.Errorf("failed to evaluate arg %s: %w", arg, err)
return fmt.Errorf("failed to evaluate env %s: %w", e, err)
}
args = append(args, val)
}

e.containerConfig.Cmd = append([]string{e.step.Command}, args...)
containerConfig.Env = env

resp, err := cli.ContainerCreate(
ctx, e.containerConfig, e.hostConfig, nil, nil, "",
ctx, &containerConfig, e.hostConfig, e.networkConfig, nil, "",
)
if err != nil {
return err
}

removing := false
var once sync.Once
removeContainer := func() {
if !e.autoRemove || removing {
if !e.autoRemove {
return
}
removing = true
if err := cli.ContainerRemove(
ctx, resp.ID, container.RemoveOptions{
Force: true,
},
); err != nil {
logger.Error(ctx, "docker executor: remove container", "err", err)
}
once.Do(func() {
if err := cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true}); err != nil {
logger.Error(ctx, "docker executor: remove container", "err", err)
}
})
}

defer removeContainer()
Expand All @@ -164,7 +175,7 @@ func (e *docker) Run(ctx context.Context) error {
return e.attachAndWait(ctx, cli, resp.ID)
}

func (e *docker) execInContainer(ctx context.Context, cli *client.Client) error {
func (e *docker) execInContainer(ctx context.Context, cli *client.Client, args []string) error {
// Check if containerInfo exists and is running
containerInfo, err := cli.ContainerInspect(ctx, e.containerName)
if err != nil {
Expand All @@ -183,7 +194,7 @@ func (e *docker) execInContainer(ctx context.Context, cli *client.Client) error
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Cmd: append([]string{e.step.Command}, e.step.Args...),
Cmd: append([]string{e.step.Command}, args...),
Env: e.execConfig.Env,
WorkingDir: e.execConfig.WorkingDir,
}
Expand Down Expand Up @@ -273,7 +284,9 @@ func newDocker(
) (Executor, error) {
containerConfig := &container.Config{}
hostConfig := &container.HostConfig{}
execConfig := container.ExecOptions{}
execConfig := &container.ExecOptions{}
networkConfig := &network.NetworkingConfig{}

execCfg := step.ExecutorConfig
stepContext := digraph.GetStepContext(ctx)

Expand All @@ -292,7 +305,7 @@ func newDocker(
if err != nil {
return nil, fmt.Errorf("failed to evaluate string fields: %w", err)
}
*containerConfig = replaced
containerConfig = &replaced
}

if cfg, ok := execCfg.Config["host"]; ok {
Expand All @@ -309,7 +322,24 @@ func newDocker(
if err != nil {
return nil, fmt.Errorf("failed to evaluate string fields: %w", err)
}
*hostConfig = replaced
hostConfig = &replaced
}

if cfg, ok := execCfg.Config["network"]; ok {
md, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Result: networkConfig,
})
if err != nil {
return nil, fmt.Errorf("failed to create decoder: %w", err)
}
if err := md.Decode(cfg); err != nil {
return nil, fmt.Errorf("failed to decode config: %w", err)
}
replaced, err := digraph.EvalStringFields(stepContext, *networkConfig)
if err != nil {
return nil, fmt.Errorf("failed to evaluate string fields: %w", err)
}
networkConfig = &replaced
}

if cfg, ok := execCfg.Config["exec"]; ok {
Expand All @@ -322,11 +352,11 @@ func newDocker(
if err := md.Decode(cfg); err != nil {
return nil, fmt.Errorf("failed to decode config: %w", err)
}
replaced, err := digraph.EvalStringFields(stepContext, execConfig)
replaced, err := digraph.EvalStringFields(stepContext, *execConfig)
if err != nil {
return nil, fmt.Errorf("failed to evaluate string fields: %w", err)
}
execConfig = replaced
execConfig = &replaced
}

autoRemove := false
Expand Down Expand Up @@ -358,6 +388,7 @@ func newDocker(
stdout: os.Stdout,
containerConfig: containerConfig,
hostConfig: hostConfig,
networkConfig: networkConfig,
execConfig: execConfig,
autoRemove: autoRemove,
}
Expand Down

0 comments on commit 2810502

Please sign in to comment.