This repository contains various data processing workflows implemented using Dagster, designed to run in a Kubernetes environment.
my_dagster_project/
├── Dockerfile
├── pyproject.toml
└── my_dagster_project/
├── __init__.py
├── assets/
│ ├── __init__.py
│ ├── pandas_workflow.py
│ ├── polars_workflow.py
│ ├── duckdb_workflow.py
│ ├── postgres_workflow.py
│ └── sales_workflow.py
└── resources/
├── __init__.py
└── database.py
- Pandas Workflow: Stock market data analysis
- Polars Workflow: Weather data analysis
- DuckDB Workflow: Analytics using DuckDB
- Postgres Workflow: Database operations
- Sales Workflow: Synthetic data analysis
# my_dagster_project/assets/new_workflow.py
from dagster import asset, Output, MetadataValue
@asset
def my_new_asset():
"""
Description of what this asset does
"""
# Your asset logic here
return Output(
result,
metadata={
"rows": len(result),
"preview": MetadataValue.md(result.head().to_markdown())
}
)# my_dagster_project/resources/new_resource.py
from dagster import resource
@resource(config_schema={"param": str})
def my_new_resource(context):
return MyResourceClass(context.resource_config["param"])Update my_dagster_project/__init__.py:
from dagster import Definitions
from .resources.new_resource import my_new_resource
defs = Definitions(
assets=[], # Assets are auto-discovered
resources={
"my_new_resource": my_new_resource,
# ... other resources
}
)The CI/CD pipeline automatically builds and pushes Docker images based on git actions:
-
Branch Builds:
- Format:
your-username/my-dagster-project:branch-name - Example:
your-username/my-dagster-project:feature-new-workflow
- Format:
-
Pull Request Builds:
- Format:
your-username/my-dagster-project:pr-{number} - Example:
your-username/my-dagster-project:pr-123
- Format:
-
Release Tags:
- Format:
your-username/my-dagster-project:{version} - Example:
your-username/my-dagster-project:1.0.0
- Format:
-
Main Branch:
- Tag:
your-username/my-dagster-project:latest
- Tag:
-
Commit SHA:
- Format:
your-username/my-dagster-project:sha-{hash} - Example:
your-username/my-dagster-project:sha-a1b2c3d
- Format:
# kubernetes/my-workflow-deployment.yaml
apiVersion: dagster.io/v1alpha1
kind: UserDeployment
metadata:
name: my-new-workflow
spec:
deployment:
image:
repository: your-username/my-dagster-project
tag: latest # or specific version
dagsterApiGrpcArgs:
- "--module-name"
- "my_dagster_project"# Deploy new version
kubectl apply -f kubernetes/my-workflow-deployment.yaml
# Check deployment status
kubectl get userdeployments# Update image version
kubectl patch userdeployment my-new-workflow --type=json \
-p='[{"op": "replace", "path": "/spec/deployment/image/tag", "value": "1.0.1"}]'- Use Black for code formatting
- Add type hints where possible
- Include docstrings for all assets and resources
# Run tests
pytest tests/
# Format code
black .
isort .- Create feature branches from main
- Use conventional commits:
- feat: New feature
- fix: Bug fix
- docs: Documentation
- chore: Maintenance
# Tag new version
git tag -a v1.0.0 -m "Release version 1.0.0"
git push origin v1.0.0-
Asset Not Found:
- Check asset file naming
- Verify module is in correct directory
-
Resource Configuration:
- Verify resource config in deployment
- Check Kubernetes secrets if needed
-
Docker Build Fails:
- Check dependency versions
- Verify system requirements
# Get dagster logs
kubectl logs -l app=dagster-user-deployments
# Check deployment status
kubectl describe userdeployment my-workflowMIT License - See LICENSE file for details