Skip to content
33 changes: 29 additions & 4 deletions airflow/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,16 @@ func normalizeName(s string) string {
return strings.TrimLeft(s, "_-")
}

// PortOverrides allows callers to override the default ports used in the
// generated compose config. When nil, ports are read from config as usual.
type PortOverrides struct {
PostgresPort string
WebserverPort string
APIServerPort string
}

// generateConfig generates the docker-compose config
func generateConfig(projectName, airflowHome, envFile, buildImage, settingsFile string, imageLabels map[string]string) (string, error) {
func generateConfig(projectName, airflowHome, envFile, buildImage, settingsFile string, imageLabels map[string]string, portOverrides ...*PortOverrides) (string, error) {
runtimeVersion, ok := imageLabels[runtimeVersionLabelName]
if !ok {
return "", errors.New("runtime version label not found")
Expand Down Expand Up @@ -169,18 +177,35 @@ func generateConfig(projectName, airflowHome, envFile, buildImage, settingsFile
logger.Debug(err)
}

// Determine ports: use overrides if provided, otherwise read from config
pgPort := config.CFG.PostgresPort.GetString()
wsPort := config.CFG.WebserverPort.GetString()
apiPort := config.CFG.APIServerPort.GetString()
if len(portOverrides) > 0 && portOverrides[0] != nil {
po := portOverrides[0]
if po.PostgresPort != "" {
pgPort = po.PostgresPort
}
if po.WebserverPort != "" {
wsPort = po.WebserverPort
}
if po.APIServerPort != "" {
apiPort = po.APIServerPort
}
}

cfg := ComposeConfig{
PostgresUser: config.CFG.PostgresUser.GetString(),
PostgresPassword: config.CFG.PostgresPassword.GetString(),
PostgresHost: config.CFG.PostgresHost.GetString(),
PostgresPort: config.CFG.PostgresPort.GetString(),
PostgresPort: pgPort,
PostgresRepository: config.CFG.PostgresRepository.GetString(),
PostgresTag: config.CFG.PostgresTag.GetString(),
AirflowImage: airflowImage,
AirflowHome: airflowHome,
AirflowUser: "astro",
AirflowWebserverPort: config.CFG.WebserverPort.GetString(),
AirflowAPIServerPort: config.CFG.APIServerPort.GetString(),
AirflowWebserverPort: wsPort,
AirflowAPIServerPort: apiPort,
AirflowEnvFile: envFile,
AirflowExposePort: config.CFG.AirflowExposePort.GetBool(),
MountLabel: "z",
Expand Down
233 changes: 225 additions & 8 deletions airflow/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/Masterminds/semver/v3"
"github.com/astronomer/astro-cli/airflow/proxy"
"github.com/astronomer/astro-cli/airflow/runtimes"
airflowTypes "github.com/astronomer/astro-cli/airflow/types"
airflowversions "github.com/astronomer/astro-cli/airflow_versions"
Expand Down Expand Up @@ -212,9 +213,27 @@ func DockerComposeInit(airflowHome, envFile, dockerfile, imageName string) (*Doc
}, nil
}

// removeProxyRoute deregisters the proxy route for this project and
// stops the proxy daemon if no routes remain.
func (d *DockerCompose) removeProxyRoute() {
hostname, err := proxy.DeriveHostname(d.airflowHome)
if err != nil {
logger.Debugf("could not derive proxy hostname: %s", err)
return
}
remaining, err := proxy.RemoveRoute(hostname)
if err != nil {
logger.Debugf("could not remove proxy route for %s: %s", hostname, err)
return
}
if remaining == 0 {
proxy.StopIfEmpty()
}
}

// Start starts a local airflow development cluster
//
//nolint:gocognit
//nolint:gocognit,gocyclo
func (d *DockerCompose) Start(opts *airflowTypes.StartOptions) error {
imageName := opts.ImageName
settingsFile := opts.SettingsFile
Expand All @@ -224,6 +243,7 @@ func (d *DockerCompose) Start(opts *airflowTypes.StartOptions) error {
noBrowser := opts.NoBrowser
waitTime := opts.WaitTime
envConns := opts.EnvConns
useProxy := !opts.NoProxy

// Build this project image
if imageName == "" {
Expand Down Expand Up @@ -258,14 +278,65 @@ func (d *DockerCompose) Start(opts *airflowTypes.StartOptions) error {
return err
}

// Determine ports: allocate random ports when proxy is enabled, use config defaults otherwise
var portOvr *PortOverrides
var proxyHostname, proxyPort string
if useProxy {
proxyPort = config.CFG.ProxyPort.GetString()
if proxyPort == "" {
proxyPort = proxy.DefaultPort
}

hostname, hErr := proxy.DeriveHostname(d.airflowHome)
if hErr != nil {
// Fall back to non-proxy mode if hostname derivation fails
useProxy = false
} else {
proxyHostname = hostname

// Try default ports first; allocate random only if taken.
// NOTE: there is an inherent TOCTOU race between checking port availability
// here and Docker Compose binding the port later. If another process grabs the
// port in between, compose up will fail with a clear "port already in use" error.
// This is acceptable — the window is small and the failure mode is obvious.
webPort := config.CFG.APIServerPort.GetString()
if !proxy.IsPortAvailable(webPort) {
var aErr error
webPort, aErr = proxy.AllocatePort()
if aErr != nil {
return fmt.Errorf("error allocating webserver port: %w", aErr)
}
}
pgPort := config.CFG.PostgresPort.GetString()
if !proxy.IsPortAvailable(pgPort) {
var aErr error
pgPort, aErr = proxy.AllocatePort()
if aErr != nil {
return fmt.Errorf("error allocating postgres port: %w", aErr)
}
}

portOvr = &PortOverrides{
PostgresPort: pgPort,
WebserverPort: webPort,
APIServerPort: webPort,
}
}
}

s := spinner.NewSpinner("Project is starting up…")
if !logger.IsLevelEnabled(logrus.DebugLevel) {
s.Start()
defer s.Stop()
}

// Create a compose project
project, err := createDockerProject(d.projectName, d.airflowHome, d.envFile, "", settingsFile, composeFile, imageLabels)
// Create a compose project (with port overrides if proxy is enabled)
var project *composetypes.Project
if useProxy && composeFile == "" {
project, err = createDockerProjectWithPorts(d.projectName, d.airflowHome, d.envFile, "", settingsFile, imageLabels, portOvr)
} else {
project, err = createDockerProject(d.projectName, d.airflowHome, d.envFile, "", settingsFile, composeFile, imageLabels)
}
if err != nil {
return errors.Wrap(err, composeCreateErrMsg)
}
Expand All @@ -292,28 +363,63 @@ func (d *DockerCompose) Start(opts *airflowTypes.StartOptions) error {
s.Start()
defer s.Stop()

// Build health check URL using actual ports (overrides or config)
airflowMajorVersion := airflowversions.AirflowMajorVersionForRuntimeVersion(imageLabels[runtimeVersionLabelName])
var healthURL, healthComponent string
switch airflowMajorVersion {
case "3":
healthURL = fmt.Sprintf("http://localhost:%s/api/v2/monitor/health", config.CFG.APIServerPort.GetString())
apiPort := config.CFG.APIServerPort.GetString()
if portOvr != nil && portOvr.APIServerPort != "" {
apiPort = portOvr.APIServerPort
}
healthURL = fmt.Sprintf("http://localhost:%s/api/v2/monitor/health", apiPort)
healthComponent = "api-server"
case "2":
healthURL = fmt.Sprintf("http://localhost:%s/health", config.CFG.WebserverPort.GetString())
wsPort := config.CFG.WebserverPort.GetString()
if portOvr != nil && portOvr.WebserverPort != "" {
wsPort = portOvr.WebserverPort
}
healthURL = fmt.Sprintf("http://localhost:%s/health", wsPort)
healthComponent = "webserver"
}

// Check the health of the webserver, up to the timeout.
// If we fail to get a 200 status code, we'll return an error message.
err = checkWebserverHealth(healthURL, waitTime, healthComponent)
if err != nil {
return err
}

spinner.StopWithCheckmark(s, "Project started")

// If we've successfully gotten a healthcheck response, print the status.
err = printStatus(settingsFile, envConns, project, d.composeService, airflowDockerVersion, noBrowser)
// Register route with the proxy and start the daemon
if useProxy && proxyHostname != "" {
if _, ensureErr := proxy.EnsureRunning(proxyPort); ensureErr != nil {
fmt.Printf("Warning: could not start proxy: %s\n", ensureErr.Error())
} else {
services := map[string]string{}
if portOvr != nil && portOvr.PostgresPort != "" {
services["postgres"] = portOvr.PostgresPort
}
route := proxy.Route{
Hostname: proxyHostname,
Port: portOvr.WebserverPort,
ProjectDir: d.airflowHome,
PID: 0, // Docker routes don't track PID — CLI exits after start
Services: services,
Mode: "docker",
}
if addErr := proxy.AddRoute(&route); addErr != nil {
fmt.Printf("Warning: could not register proxy route: %s\n", addErr.Error())
}
}
}

// Print the status
if useProxy && proxyHostname != "" {
err = printProxyStatus(settingsFile, envConns, project, d.composeService, airflowDockerVersion, noBrowser, proxyHostname, proxyPort, portOvr)
} else {
err = printStatus(settingsFile, envConns, project, d.composeService, airflowDockerVersion, noBrowser)
}
if err != nil {
return err
}
Expand Down Expand Up @@ -354,6 +460,9 @@ func (d *DockerCompose) ComposeExport(settingsFile, composeFile string) error {

// Stop a running docker project
func (d *DockerCompose) Stop(waitForExit bool) error {
// Deregister proxy route before stopping containers
d.removeProxyRoute()

s := spinner.NewSpinner("Stopping project…")
if !logger.IsLevelEnabled(logrus.DebugLevel) {
s.Start()
Expand Down Expand Up @@ -444,6 +553,9 @@ func (d *DockerCompose) PS() error {

// Kill stops a local airflow development cluster
func (d *DockerCompose) Kill() error {
// Deregister proxy route before killing containers
d.removeProxyRoute()

s := spinner.NewSpinner("Killing project…")
if !logger.IsLevelEnabled(logrus.DebugLevel) {
s.Start()
Expand Down Expand Up @@ -1529,6 +1641,111 @@ var createDockerProject = func(projectName, airflowHome, envFile, buildImage, se
return project, nil
}

// createDockerProjectWithPorts creates a Docker Compose project with port overrides for proxy mode.
var createDockerProjectWithPorts = func(projectName, airflowHome, envFile, buildImage, settingsFile string, imageLabels map[string]string, portOvr *PortOverrides) (*composetypes.Project, error) {
yaml, err := generateConfig(projectName, airflowHome, envFile, buildImage, settingsFile, imageLabels, portOvr)
if err != nil {
return nil, errors.Wrap(err, "failed to create project")
}

var configs []composetypes.ConfigFile
configs = append(configs, composetypes.ConfigFile{
Filename: "compose.yaml",
Content: []byte(yaml),
})

composeBytes, err := os.ReadFile(composeOverrideFilename)
if err != nil && !os.IsNotExist(err) {
return nil, errors.Wrapf(err, "Failed to open the compose file: %s", composeOverrideFilename)
}
if err == nil {
configs = append(configs, composetypes.ConfigFile{
Filename: "docker-compose.override.yml",
Content: composeBytes,
})
}

var loadOptions []func(*loader.Options)
nameLoadOpt := func(opts *loader.Options) {
opts.SetProjectName(normalizeName(projectName), true)
opts.Interpolate = &interpolation.Options{
LookupValue: os.LookupEnv,
}
}
loadOptions = append(loadOptions, nameLoadOpt)

project, err := loader.LoadWithContext(context.Background(), composetypes.ConfigDetails{
ConfigFiles: configs,
WorkingDir: airflowHome,
}, loadOptions...)
if err != nil {
return nil, errors.Wrap(err, "failed to load project")
}

for name, s := range project.Services {
s.CustomLabels = map[string]string{
api.ProjectLabel: project.Name,
api.ServiceLabel: name,
api.VersionLabel: api.ComposeVersion,
api.WorkingDirLabel: project.WorkingDir,
api.ConfigFilesLabel: strings.Join(project.ComposeFiles, ","),
api.OneoffLabel: "False",
}
project.Services[name] = s
}
return project, nil
}

// printProxyStatus prints status information when the proxy is active.
func printProxyStatus(settingsFile string, envConns map[string]astrocore.EnvironmentObjectConnection, project *composetypes.Project, composeService api.Service, airflowMajorVersion uint64, noBrowser bool, hostname, proxyPort string, portOvr *PortOverrides) error {
containers, err := composeService.Ps(context.Background(), project.Name, api.PsOptions{
All: true,
})
if err != nil {
return errors.Wrap(err, composeStatusCheckErrMsg)
}

settingsFileExists, err := fileutil.Exists(settingsFile, nil)
if err != nil {
return errors.Wrap(err, errSettingsPath)
}
if settingsFileExists || len(envConns) > 0 {
for _, container := range containers { //nolint:gocritic
if strings.Contains(container.Name, project.Name) &&
(strings.Contains(container.Name, WebserverDockerContainerName) ||
strings.Contains(container.Name, APIServerDockerContainerName)) {
err = initSettings(container.ID, settingsFile, envConns, airflowMajorVersion, true, true, true)
if err != nil {
return err
}
}
}
}

uiURL := fmt.Sprintf("http://%s:%s", hostname, proxyPort)
bullet := ansi.Cyan("\u27A4") + " "
fmt.Printf(bullet+composeLinkUIMsg+"\n", ansi.Bold(uiURL))

pgPort := config.CFG.PostgresPort.GetString()
if portOvr != nil && portOvr.PostgresPort != "" {
pgPort = portOvr.PostgresPort
}
fmt.Printf(bullet+composeLinkPostgresMsg+"\n", ansi.Bold("postgresql://localhost:"+pgPort+"/postgres"))

if airflowMajorVersion == airflowMajorVersion2 {
fmt.Printf(bullet+composeUserPasswordMsg+"\n", ansi.Bold("admin:admin"))
}
fmt.Printf(bullet+postgresUserPasswordMsg+"\n", ansi.Bold("postgres:postgres"))

if !(noBrowser || util.CheckEnvBool(os.Getenv("ASTRONOMER_NO_BROWSER"))) {
err = openURL(uiURL)
if err != nil {
fmt.Println("\nUnable to open the Airflow UI, please visit the following link: " + uiURL)
}
}
return nil
}

func printStatus(settingsFile string, envConns map[string]astrocore.EnvironmentObjectConnection, project *composetypes.Project, composeService api.Service, airflowMajorVersion uint64, noBrowser bool) error {
containers, err := composeService.Ps(context.Background(), project.Name, api.PsOptions{
All: true,
Expand Down
Loading