Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions astro-airflow-mcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,14 @@ af config version
af config connections
af config variables
af config pools

# Direct API access (any endpoint)
af api --endpoints # List all available endpoints
af api --endpoints --filter variable # Filter endpoints by pattern
af api dags # GET /api/v{1,2}/dags
af api dags -F limit=10 # With query parameters
af api variables -X POST -F key=x -f value=y # Create variable
af api variables/x -X DELETE # Delete variable
```

### Instance Management
Expand All @@ -309,6 +317,43 @@ af --instance staging dags list

Config file location: `~/.af/config.yaml` (override with `--config` or `AF_CONFIG` env var)

### Direct API Access

The `af api` command provides direct access to any Airflow REST API endpoint, similar to `gh api` for GitHub:

```bash
# Discover available endpoints
af api --endpoints
af api --endpoints --filter variable

# GET requests (default)
af api dags
af api dags -F limit=10 -F only_active=true
af api dags/my_dag

# POST/PATCH/DELETE requests
af api variables -X POST -F key=my_var -f value="my value"
af api dags/my_dag -X PATCH -F is_paused=false
af api variables/old_var -X DELETE

# With JSON body
af api connections -X POST --body '{"connection_id": "x", "conn_type": "postgres"}'

# Include response headers
af api dags -i

# Access non-versioned endpoints
af api health --raw

# Get full OpenAPI spec
af api --spec
```

**Field syntax:**
- `-F key=value`: Auto-converts types (numbers, booleans, null)
- `-f key=value`: Keeps value as raw string
- `--body '{}'`: Raw JSON body for complex objects

```yaml
instances:
- name: local
Expand Down
104 changes: 104 additions & 0 deletions astro-airflow-mcp/src/astro_airflow_mcp/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,110 @@ def _patch(
response.raise_for_status()
return response.json()

def _delete(
self,
endpoint: str,
) -> dict[str, Any]:
"""Make HTTP DELETE call to Airflow API.

Args:
endpoint: API endpoint path (without base path)

Returns:
Parsed JSON response (or empty dict if no content)

Raises:
NotFoundError: If endpoint returns 404
Exception: For other HTTP errors
"""
headers, auth = self._setup_auth()
headers["Accept"] = "application/json"
url = f"{self.airflow_url}{self.api_base_path}/{endpoint}"

with httpx.Client(timeout=30.0) as client:
response = client.delete(url, headers=headers, auth=auth)

if response.status_code == 404:
raise NotFoundError(endpoint)

response.raise_for_status()
# DELETE often returns 204 No Content
if response.status_code == 204 or not response.text:
return {}
return response.json()

def raw_request(
self,
method: str,
endpoint: str,
params: dict[str, Any] | None = None,
json_data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
raw_endpoint: bool = False,
) -> dict[str, Any]:
"""Make raw HTTP request to Airflow API.

This method provides direct access to any Airflow REST API endpoint,
similar to `gh api` for GitHub. It automatically handles authentication
and API version prefixes based on the Airflow version.

Args:
method: HTTP method (GET, POST, PATCH, PUT, DELETE)
endpoint: API endpoint path (e.g., "dags" or "/dags")
params: Query parameters
json_data: JSON body for POST/PATCH/PUT requests
headers: Additional headers to include
raw_endpoint: If True, use endpoint path as-is without API version prefix.
Useful for endpoints like /health that don't have version prefix.

Returns:
Dict with 'status_code', 'headers', 'body' keys containing the raw response.

Example:
# GET /api/v1/dags (AF2) or /api/v2/dags (AF3)
adapter.raw_request("GET", "dags")

# GET /health (no version prefix)
adapter.raw_request("GET", "health", raw_endpoint=True)

# POST with JSON body
adapter.raw_request("POST", "variables", json_data={"key": "x", "value": "y"})
"""
auth_headers, auth = self._setup_auth()
all_headers = {**auth_headers, **(headers or {})}

# Build URL: with or without version prefix
endpoint = endpoint.lstrip("/")
if raw_endpoint:
url = f"{self.airflow_url}/{endpoint}"
else:
url = f"{self.airflow_url}{self.api_base_path}/{endpoint}"

with httpx.Client(timeout=30.0) as client:
response = client.request(
method=method.upper(),
url=url,
params=params,
json=json_data,
headers=all_headers,
auth=auth,
)

# Parse body - handle empty responses and non-JSON
body: Any = None
if response.text:
try:
body = response.json()
except ValueError:
# Not JSON, return as text
body = response.text

return {
"status_code": response.status_code,
"headers": dict(response.headers),
"body": body,
}

def _handle_not_found(self, endpoint: str, alternative: str | None = None) -> dict[str, Any]:
"""Create a structured response for unavailable endpoints.

Expand Down
Loading