Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 256 additions & 12 deletions scripts/in_container/install_airflow_and_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,33 @@

import os
import re
import shutil
import sys
from functools import cache
from pathlib import Path
from typing import NamedTuple

sys.path.insert(0, str(Path(__file__).parent.resolve()))
from in_container_utils import AIRFLOW_CORE_SOURCES_PATH, AIRFLOW_DIST_PATH, click, console, run_command
from in_container_utils import (
AIRFLOW_CORE_SOURCES_PATH,
AIRFLOW_DIST_PATH,
AIRFLOW_ROOT_PATH,
click,
console,
run_command,
)

SOURCE_TARBALL = AIRFLOW_ROOT_PATH / ".build" / "airflow.tar.gz"
EXTRACTED_SOURCE_DIR = AIRFLOW_ROOT_PATH / ".build" / "airflow_source"
CORE_UI_DIST_PREFIX = "ui/dist"
CORE_SOURCE_UI_PREFIX = "airflow-core/src/airflow/ui"
CORE_SOURCE_UI_DIRECTORY = AIRFLOW_CORE_SOURCES_PATH / "airflow" / "ui"
SIMPLE_AUTH_MANAGER_UI_DIST_PREFIX = "api_fastapi/auth/managers/simple/ui/dist"
SIMPLE_AUTH_MANAGER_SOURCE_UI_PREFIX = "airflow-core/src/airflow/api_fastapi/auth/managers/simple/ui"
SIMPLE_AUTH_MANAGER_SOURCE_UI_DIRECTORY = (
AIRFLOW_CORE_SOURCES_PATH / "airflow" / "api_fastapi" / "auth" / "managers" / "simple" / "ui"
)
INTERNAL_SERVER_ERROR = "500 Internal Server Error"


def get_provider_name(package_name: str) -> str:
Expand Down Expand Up @@ -208,13 +229,30 @@ def get_providers_constraints_location(
)


@cache
def get_airflow_installation_path() -> Path:
"""Get the installation path of Airflow in the container.
Will return somehow like `/usr/python/lib/python3.10/site-packages/airflow`.
"""
import importlib.util

spec = importlib.util.find_spec("airflow")
if spec is None or spec.origin is None:
console.print("[red]Airflow not found - cannot mount sources")
sys.exit(1)

airflow_path = Path(spec.origin).parent
return airflow_path


class InstallationSpec(NamedTuple):
airflow_distribution: str | None
airflow_core_distribution: str | None
airflow_constraints_location: str | None
airflow_task_sdk_distribution: str | None
airflow_ctl_distribution: str | None
airflow_ctl_constraints_location: str | None
compile_ui_assets: bool | None
provider_distributions: list[str]
provider_constraints_location: str | None
pre_release: bool = os.environ.get("ALLOW_PRE_RELEASES", "false").lower() == "true"
Expand Down Expand Up @@ -313,6 +351,7 @@ def find_installation_spec(
else:
airflow_ctl_constraints_location = None
airflow_ctl_distribution = airflow_ctl_spec
compile_ui_assets = False
elif use_airflow_version == "none" or use_airflow_version == "":
console.print("\n[bright_blue]Skipping airflow package installation\n")
airflow_distribution_spec = None
Expand All @@ -321,6 +360,7 @@ def find_installation_spec(
airflow_task_sdk_distribution = None
airflow_ctl_distribution = None
airflow_ctl_constraints_location = None
compile_ui_assets = False
elif repo_match := re.match(GITHUB_REPO_BRANCH_PATTERN, use_airflow_version):
owner, repo, branch = repo_match.groups()
console.print(f"\nInstalling airflow from GitHub: {use_airflow_version}\n")
Expand All @@ -341,6 +381,7 @@ def find_installation_spec(
github_repository=github_repository,
python_version=python_version,
)
compile_ui_assets = True
console.print(f"\nInstalling airflow task-sdk from GitHub {use_airflow_version}\n")
airflow_task_sdk_distribution = f"apache-airflow-task-sdk @ {vcs_url}#subdirectory=task-sdk"
airflow_constraints_location = get_airflow_constraints_location(
Expand All @@ -365,16 +406,13 @@ def find_installation_spec(
github_repository=github_repository,
python_version=python_version,
)
console.print(
"[yellow]Note that installing airflow from branch has no assets compiled, so you will"
"not be able to run UI (we might add asset compilation for this case later if needed)."
)
elif use_airflow_version in ["wheel", "sdist"] and not use_distributions_from_dist:
console.print(
"[red]USE_AIRFLOW_VERSION cannot be 'wheel' or 'sdist' without --use-distributions-from-dist"
)
sys.exit(1)
else:
compile_ui_assets = False
console.print(f"\nInstalling airflow via apache-airflow=={use_airflow_version}")
airflow_distribution_spec = f"apache-airflow{airflow_extras}=={use_airflow_version}"
airflow_core_distribution_spec = (
Expand Down Expand Up @@ -427,6 +465,7 @@ def find_installation_spec(
airflow_task_sdk_distribution=airflow_task_sdk_distribution,
airflow_ctl_distribution=airflow_ctl_distribution,
airflow_ctl_constraints_location=airflow_ctl_constraints_location,
compile_ui_assets=compile_ui_assets,
provider_distributions=provider_distributions_list,
provider_constraints_location=get_providers_constraints_location(
providers_constraints_mode=providers_constraints_mode,
Expand All @@ -443,6 +482,208 @@ def find_installation_spec(
return installation_spec


def download_airflow_source_tarball(installation_spec: InstallationSpec):
"""Download Airflow source tarball from GitHub."""
if not installation_spec.compile_ui_assets:
console.print(
"[bright_blue]Skipping downloading Airflow source tarball since UI assets compilation is disabled."
)
return

if not installation_spec.airflow_distribution:
console.print("[yellow]No airflow distribution specified, cannot download source tarball.")
return

if SOURCE_TARBALL.exists() and EXTRACTED_SOURCE_DIR.exists():
console.print(
"[bright_blue]Source tarball and extracted source directory already exist. Skipping download."
)
return

# Extract GitHub repository information from airflow_distribution
# Expected format: "apache-airflow @ git+https://github.com/owner/repo.git@branch"
airflow_dist = installation_spec.airflow_distribution
git_url_match = re.search(r"git\+https://github\.com/([^/]+)/([^/]+)\.git@([^#\s]+)", airflow_dist)

if not git_url_match:
console.print(f"[yellow]Cannot extract GitHub repository info from: {airflow_dist}")
return

owner, repo, ref = git_url_match.groups()
console.print(f"[bright_blue]Downloading source tarball from GitHub: {owner}/{repo}@{ref}")

# Create build directory if it doesn't exist
SOURCE_TARBALL.parent.mkdir(parents=True, exist_ok=True)

# Download tarball from GitHub API if it doesn't exist
if not SOURCE_TARBALL.exists():
tarball_url = f"https://api.github.com/repos/{owner}/{repo}/tarball/{ref}"
console.print(f"[bright_blue]Downloading from: {tarball_url}")

try:
result = run_command(
["curl", "-L", tarball_url, "-o", str(SOURCE_TARBALL)],
github_actions=False,
shell=False,
check=True,
)

if result.returncode != 0:
console.print(f"[red]Failed to download tarball: {result.stderr}")
return
except Exception as e:
console.print(f"[red]Error downloading source tarball: {e}")
return
else:
console.print(f"[bright_blue]Source tarball already exists at: {SOURCE_TARBALL}")

try:
# Create temporary extraction directory
if EXTRACTED_SOURCE_DIR.exists():
shutil.rmtree(EXTRACTED_SOURCE_DIR)
# make sure .build exists
EXTRACTED_SOURCE_DIR.parent.mkdir(parents=True, exist_ok=True)

# Extract tarball
console.print(f"[bright_blue]Extracting tarball to: {EXTRACTED_SOURCE_DIR}")
result = run_command(
["tar", "-xzf", str(SOURCE_TARBALL), "-C", str(EXTRACTED_SOURCE_DIR.parent)],
github_actions=False,
shell=False,
check=True,
)

if result.returncode != 0:
console.print(f"[red]Failed to extract tarball: {result.stderr}")
return

# Rename extracted directory to a known name
extracted_dirs = list(EXTRACTED_SOURCE_DIR.parent.glob(f"{owner}-{repo}-*"))
if not extracted_dirs:
console.print("[red]No extracted directory found after tarball extraction.")
return
extracted_dir = extracted_dirs[0]
extracted_dir.rename(EXTRACTED_SOURCE_DIR)
console.print("[bright_blue]Source tarball downloaded and extracted successfully")

except Exception as e:
console.print(f"[red]Error extracting source tarball: {e}")
return


def compile_ui_assets(
installation_spec: InstallationSpec,
source_prefix: str,
source_ui_directory: Path,
dist_prefix: str,
):
if not installation_spec.compile_ui_assets:
console.print("[bright_blue]Skipping UI assets compilation")
return

# Copy UI directories from extracted tarball source to core source directory
extracted_ui_directory = EXTRACTED_SOURCE_DIR / source_prefix
if extracted_ui_directory.exists():
console.print(
f"[bright_blue]Copying UI source from: {extracted_ui_directory} to: {source_ui_directory}"
)
if source_ui_directory.exists():
shutil.rmtree(source_ui_directory)
source_ui_directory.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(extracted_ui_directory, source_ui_directory)
else:
console.print(f"[yellow]Main UI directory not found at: {extracted_ui_directory}")

if not source_ui_directory.exists():
console.print(
f"[bright_blue]UI directory '{source_ui_directory}' still does not exist. Skipping UI assets compilation."
)
return

# check if UI assets need to be recompiled
dist_directory = get_airflow_installation_path() / dist_prefix
if dist_directory.exists():
console.print(f"[bright_blue]Already compiled UI assets found in '{dist_directory}'")
return
console.print(f"[bright_blue]No compiled UI assets found in '{dist_directory}'")

# ensure dependencies for UI assets compilation
need_pnpm = shutil.which("pnpm") is None
if need_pnpm:
console.print("[bright_blue]Installing pnpm directly from official setup script")
run_command(
[
"bash",
"-c",
"curl -fsSL https://deb.nodesource.com/setup_lts.x | bash - && apt-get install -y nodejs",
],
github_actions=False,
shell=False,
check=True,
)
run_command(["npm", "install", "-g", "pnpm"], github_actions=False, shell=False, check=True)

"""
run_command(
[
"bash",
"-c",
'wget -qO- https://get.pnpm.io/install.sh | ENV="$HOME/.bashrc" SHELL="$(which bash)" bash -',
],
github_actions=False,
shell=False,
check=True,
)
console.print("[bright_blue]Setting up pnpm PATH")
run_command(
[
"bash",
"-c",
'export PNPM_HOME="/root/.local/share/pnpm"; case ":$PATH:" in *":$PNPM_HOME:"*) ;; *) export PATH="$PNPM_HOME:$PATH" ;; esac',
],
github_actions=False,
shell=False,
check=True,
)
"""
else:
console.print("[bright_blue]pnpm already installed")

# TO avoid ` ELIFECYCLE  Command failed.` errors, we need to clear cache and node_modules
run_command(
["bash", "-c", "pnpm cache delete"],
github_actions=False,
shell=False,
check=True,
cwd=os.fspath(source_ui_directory),
)
shutil.rmtree(source_ui_directory / "node_modules", ignore_errors=True)

# install dependencies
run_command(
["bash", "-c", "pnpm install --frozen-lockfile -config.confirmModulesPurge=false"],
github_actions=False,
shell=False,
check=True,
cwd=os.fspath(source_ui_directory),
)
# compile UI assets
run_command(
["bash", "-c", "pnpm run build"],
github_actions=False,
shell=False,
check=True,
cwd=os.fspath(source_ui_directory),
)
# copy compiled assets to installation directory
dist_source_directory = source_ui_directory / "dist"
console.print(
f"[bright_blue]Copying compiled UI assets from '{dist_source_directory}' to '{dist_directory}'"
)
shutil.copytree(dist_source_directory, dist_directory)
console.print("[bright_blue]UI assets compiled successfully")


ALLOWED_DISTRIBUTION_FORMAT = ["wheel", "sdist", "both"]
ALLOWED_CONSTRAINTS_MODE = ["constraints-source-providers", "constraints", "constraints-no-providers"]
ALLOWED_MOUNT_SOURCES = ["remove", "tests", "providers-and-tests", "selected"]
Expand Down Expand Up @@ -660,12 +901,6 @@ def install_airflow_and_providers(
shell=True,
check=False,
)
import importlib.util

spec = importlib.util.find_spec("airflow")
if spec is None or spec.origin is None:
console.print("[red]Airflow not found - cannot mount sources")
sys.exit(1)
from packaging.version import Version

from airflow import __version__
Expand All @@ -676,7 +911,7 @@ def install_airflow_and_providers(
"[yellow]Patching airflow 2 installation "
"in order to load providers from separate distributions.\n"
)
airflow_path = Path(spec.origin).parent
airflow_path = get_airflow_installation_path()
# Make sure old Airflow will include providers including common subfolder allow to extend loading
# providers from the installed separate source packages
console.print("[yellow]Uninstalling Airflow-3 only providers\n")
Expand Down Expand Up @@ -708,6 +943,15 @@ def install_airflow_and_providers(
airflow_providers_common_init_py.parent.mkdir(exist_ok=True)
airflow_providers_common_init_py.write_text(INIT_CONTENT + "\n")

# compile ui assets
download_airflow_source_tarball(installation_spec)
compile_ui_assets(installation_spec, CORE_SOURCE_UI_PREFIX, CORE_SOURCE_UI_DIRECTORY, CORE_UI_DIST_PREFIX)
compile_ui_assets(
installation_spec,
SIMPLE_AUTH_MANAGER_SOURCE_UI_PREFIX,
SIMPLE_AUTH_MANAGER_SOURCE_UI_DIRECTORY,
SIMPLE_AUTH_MANAGER_UI_DIST_PREFIX,
)
console.print("\n[green]Done!")


Expand Down