Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/30_deploy_changes_to_production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ jobs:

DBT_ARTIFACTS_BUCKET: "convexa-local"

# Needed for dbt-api
DATACOVES__API_ENDPOINT: ${{ vars.DATACOVES__API_ENDPOINT }}
DATACOVES__API_TOKEN: ${{ secrets.DATACOVES__API_TOKEN }}
DATACOVES__ACCOUNT_ID: ${{ vars.DATACOVES__ACCOUNT_ID }}
DATACOVES__PROJECT_SLUG: ${{ vars.DATACOVES__PROJECT_SLUG }}
DATACOVES__ENVIRONMENT_SLUG: ${{ vars.DATACOVES__ENVIRONMENT_SLUG }}

# This is used by datacoves to drop the staging database for blue/green
# deployments, most likely you don't want to set this, we use it for demos
DATACOVES__DROP_DB_ON_FAIL: ${{ vars.DATACOVES__DROP_DB_ON_FAIL }}
Expand Down Expand Up @@ -112,6 +119,9 @@ jobs:
- name: Upload dbt artifacts
run: "dbt run-operation upload_artifacts"

- name: Push dbt artifacts to dbt-api on Datacoves
run: "../automate/dbt/push_dbt_artifacts.py"

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
Expand Down
175 changes: 175 additions & 0 deletions automate/dbt/push_dbt_artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/usr/bin/env -S uv run
# /// script
# dependencies = [
# "requests",
# "python-dotenv",
# "rich",
# ]
# ///

import requests
import os
from dotenv import load_dotenv
import json
from rich import print_json
from rich.console import Console
from rich.table import Table


load_dotenv()
base_url = os.getenv("DATACOVES__API_ENDPOINT")
token = os.getenv("DATACOVES__API_TOKEN")
account_id = os.getenv("DATACOVES__ACCOUNT_ID")
project_slug = os.getenv("DATACOVES__PROJECT_SLUG")
environment_slug = os.getenv("DATACOVES__ENVIRONMENT_SLUG")
dbt_home = os.getenv("DATACOVES__DBT_HOME")


#######################################
# Utility for api interactions
#######################################
def print_responce(r):
print("STATUS:", r.status_code)

response_text = r.text

try:
parsed_json = json.loads(response_text)
print_json(data=parsed_json)
except json.JSONDecodeError:
print("RESPONSE:", response_text)

print("-----------------------")

def print_table(items, keys_to_show, title="Items"):
"""Print a table showing only specified keys from a list of dictionaries"""
console = Console()
table = Table(title=title)

# Define different colors for each column
colors = ["blue", "bright_green", "yellow", "green", "cyan", "magenta", "red", "bright_cyan", "bright_magenta", "bright_yellow"]

# Add columns for each key we want to show with different colors
for index, key in enumerate(keys_to_show):
color = colors[index % len(colors)] # Cycle through colors if more columns than colors
table.add_column(key.replace('_', ' ').title(), style=color)

# Add rows for each item in the list
for item in items:
row_values = []
for key in keys_to_show:
value = item.get(key, "N/A")
row_values.append(str(value))
table.add_row(*row_values)

console.print(table)

def get_endpoint(endpoint: str) -> str:
return f"{base_url}/{endpoint}"

def get_headers() -> dict:
return {
"Accept": "application/json",
"Authorization": f"Bearer {token}"
}

#######################################
# Get information
#######################################

def health_check():
print("Checking Health of api")

r = requests.get(
url=get_endpoint(endpoint="/api/v3/healthcheck"),
headers=get_headers(),
)

print_responce(r)

#######################################
# Working with files
#######################################

def list_project_files(account_id: int, project_slug: str):
print(f"Listing files for project: {project_slug}")

r = requests.get(
# url=get_endpoint(endpoint=f"/api/v3/datacoves/account/{account_id}/projects/{project_slug}/files"),

url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/files"),

headers=get_headers(),
)

return r.json().get("data", {})

def upload_env_file(account_id: int, project_slug: str, env_slug: str,
filename: str, is_manifest: bool = False,
dag_id: str = None, run_id: str = None, use_multipart: bool = False):

print(f"Uploading file {filename} to project: {project_slug} in environment: {env_slug}")

file = {"file": (filename, open(f"{dbt_home}/target/{filename}", "rb"))}

data = {
'filename': filename,
'is_manifest': str(is_manifest).lower()
}

r = requests.post(
url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/environments/{env_slug}/files"),
headers=get_headers(),
files=file,
data=data
)

print_responce(r)

def promote_env_file(account_id: int, project_slug: str, env_slug: str,
filename: str):

print(f"Promoting file {filename} in environment: {env_slug} to project level ({project_slug})")

r = requests.post(
url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/environments/{env_slug}/files/{filename}/promote"),
headers=get_headers()
)

print_responce(r)

def delete_project_file(account_id: int, project_slug: str, filename: str):

print(f"Deleting file {filename} from project: {project_slug}")

r = requests.delete(
url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/files/{filename}"),
headers=get_headers()
)

print_responce(r)

if __name__ == "__main__":
# Get infomration

health_check()

cols = ["environment_slug",'filename', 'metadata', 'inserted_at']

# UPLOAD FILES

filenames = ["graph.gpickle", "graph_summary.json", "partial_parse.msgpack"]
for filename in filenames:
upload_env_file(account_id, project_slug, environment_slug, filename)

for filename in filenames:
promote_env_file(account_id, project_slug, environment_slug, filename)

upload_env_file(account_id, project_slug, environment_slug, "manifest.json", is_manifest=True )
promote_env_file(account_id, project_slug, environment_slug, "manifest.json" )

# delete_project_file(account_id, project_slug, "manifest.json")

# SHOW FILE DETAILS
files = list_project_files(account_id, project_slug)
print_table(files, cols)
11 changes: 5 additions & 6 deletions orchestrate/dags/daily_loan_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ def extract_and_load_fivetran():
tooltip="dlt Extract and Load"
)
def extract_and_load_dlt():
@task.datacoves_bash
@task.datacoves_bash(
env = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}),
append_env=True
)
def load_loans_data():
from orchestrate.utils import datacoves_utils

env_vars = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]})
env_exports = datacoves_utils.generate_env_exports(env_vars)

return f"{env_exports}; cd load/dlt && ./loans_data.py"
return "cd load/dlt && ./loans_data.py"

load_loans_data()

Expand Down
12 changes: 5 additions & 7 deletions orchestrate/dags/other_examples/load_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
)
def load_with_dlt():

@task.datacoves_bash
@task.datacoves_bash(
env = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}),
append_env=True
)
def load_us_population():
from orchestrate.utils import datacoves_utils

env_vars = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]})
env_exports = datacoves_utils.generate_env_exports(env_vars)

return f"{env_exports}; cd load/dlt && ./us_population.py"
return "cd load/dlt && ./us_population.py"

load_us_population()

Expand Down
33 changes: 13 additions & 20 deletions orchestrate/dags/other_examples/load_earthquake_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,25 @@ def get_last_success_date(**context):
return str(success_date - datetime.timedelta(days=3))

# Load earthquake data from USGS
@task.datacoves_bash
@task.datacoves_bash(
env=datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}),
append_env=True
)
def load_usgs_data(**context):
from orchestrate.utils import datacoves_utils

# Get the start date directly from the upstream task
# Get the start date from the upstream task
task_instance = context['task_instance']
start_date = task_instance.xcom_pull(task_ids = 'get_last_success_date')

# Set up environment variables
env_vars = datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']})
env_vars['DATACOVES__START_DATE'] = start_date

env_exports = datacoves_utils.generate_env_exports(env_vars)
start_date = task_instance.xcom_pull(task_ids='get_last_success_date')

return f"{env_exports}; cd load/dlt && ./usgs_earthquake.py --start-date $DATACOVES__START_DATE"
# Pass the start date directly to the command
return f"cd load/dlt && ./usgs_earthquake.py --start-date {start_date}"

# Load Country Polygon Data
@task.datacoves_bash
@task.datacoves_bash(
env=datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}),
append_env=True
)
def load_country_geography():
from orchestrate.utils import datacoves_utils

env_vars = datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']})

env_exports = datacoves_utils.generate_env_exports(env_vars)

return f"{env_exports}; cd load/dlt && ./country_geo.py"
return "cd load/dlt && ./country_geo.py"

# Run the dbt transformations
@task.datacoves_dbt(
Expand Down
1 change: 0 additions & 1 deletion training_and_demos/dbt-api/dbt_api_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from rich import print_json
from rich.console import Console
from rich.table import Table
from pathlib import Path

load_dotenv()
base_url = os.getenv("DATACOVES__API_ENDPOINT")
Expand Down