Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic dagster integration and example to peerdb for job scheduling #243

Merged
merged 5 commits into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
.vscode
.env

tmp/
39 changes: 39 additions & 0 deletions dagster/dagster_peerdb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
*.pyc

# Packages
*.egg
!/tests/**/*.egg
/*.egg-info
/dist/*
build
_build
.cache
*.so

# Installer logs
pip-log.txt

# Unit test / coverage reports
.coverage
.pytest_cache

.DS_Store
.idea/*
.python-version
.vscode/*

/test.py
/test_*.*

/setup.cfg
MANIFEST.in
/setup.py
/docs/site/*
/tests/fixtures/simple_project/setup.py
/tests/fixtures/project_with_extras/setup.py
.mypy_cache

.venv
/releases/*
pip-wheel-metadata
/poetry.toml
Empty file.
3 changes: 3 additions & 0 deletions dagster/dagster_peerdb/dagster_peerdb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .resources import PeerDBResource
from .ops import peerdb_execute_mirror
from .types import PeerDBMirrorOutput
31 changes: 31 additions & 0 deletions dagster/dagster_peerdb/dagster_peerdb/ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dagster import Config, In, Nothing, Out, Output, op, get_dagster_logger
from pydantic import Field

from .resources import PeerDBResource
from .types import PeerDBMirrorOutput


class PeerDBMirrorConfig(Config):
mirror_name: str = Field(
...,
description="The name of the mirror job to execute."
"This job must already have been created by using the "
"`CREATE MIRROR` commad with `disabled = true`.",
)


@op(
ins={"start_after": In(Nothing)},
out=Out(
PeerDBMirrorOutput,
description=("The output of the peerdb mirror operation."),
),
)
def peerdb_execute_mirror(context, config: PeerDBMirrorConfig, peerdb: PeerDBResource):
log = get_dagster_logger()
workflow_id = peerdb.execute_mirror(config.mirror_name)
log.info(f"Executed PeerDB workflow: {workflow_id}")
output = PeerDBMirrorOutput(
workflow_id=workflow_id,
)
return Output(output)
35 changes: 35 additions & 0 deletions dagster/dagster_peerdb/dagster_peerdb/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import psycopg
from dagster import ConfigurableResource, get_dagster_logger
from temporalio.client import Client


class PeerDBResource(ConfigurableResource):
peerdb_server_jdbc_url: str
temporal_host_port: str

def execute_mirror(
self,
mirror_name: str,
) -> str:
log = get_dagster_logger()
workflow_id = ""
with psycopg.connect(self.peerdb_server_jdbc_url) as conn:
with conn.cursor() as cur:
cur.execute(f"EXECUTE MIRROR {mirror_name}")
cur.execute(
f"SELECT workflow_id FROM flows WHERE name = '{mirror_name}'"
)
workflow_id = cur.fetchone()[0]
log.info(f"started PeerDB workflow: {workflow_id}")
asyncio.run(self.wait_for_workflow_completion(workflow_id))
return workflow_id

async def wait_for_workflow_completion(self, workflow_id: str) -> None:
# sleep for 2 seconds to give the workflow time to start
await asyncio.sleep(2)
log = get_dagster_logger()
client = await Client.connect(self.temporal_host_port, namespace="default")
log.info(f"waiting for PeerDB workflow: {workflow_id}")
handle = client.get_workflow_handle(workflow_id)
await handle.result()
12 changes: 12 additions & 0 deletions dagster/dagster_peerdb/dagster_peerdb/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Any, Mapping, NamedTuple, Optional


class PeerDBMirrorOutput(
NamedTuple(
"_PeerDBMirrorOutput",
[
("workflow_id", str),
],
)
):
pass
30 changes: 30 additions & 0 deletions dagster/dagster_peerdb/example/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from dagster import Definitions, load_assets_from_modules, job

from . import assets
from dagster_peerdb import PeerDBResource, peerdb_execute_mirror

all_assets = load_assets_from_modules([assets])

peerdb_resource = PeerDBResource.configure_at_launch()


simple_mirror_op = peerdb_execute_mirror.configured(
{
"mirror_name": "simple_mirror_2",
},
name="simple_mirror",
)


@job(
resource_defs={"peerdb": peerdb_resource},
)
def simple_mirror_job():
simple_mirror_op()


defs = Definitions(
assets=all_assets,
jobs=[simple_mirror_job],
resources={"peerdb": peerdb_resource},
)
Empty file.
1,774 changes: 1,774 additions & 0 deletions dagster/dagster_peerdb/poetry.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions dagster/dagster_peerdb/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[tool.poetry]
name = "dagster-peerdb"
version = "0.1.0"
description = ""
authors = ["Kaushik Iska <iska.kaushik@gmail.com>"]
readme = "README.md"
packages = [{include = "dagster_peerdb"}]

[tool.poetry.dependencies]
python = "^3.11"
dagster = "^1.4.2"
temporalio = "^1.2.0"
asyncio = "^3.4.3"
psycopg = "^3.1.9"
dagster-webserver = "^1.4.2"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.dagster]
module_name = "example"
16 changes: 16 additions & 0 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub enum PeerDDL {
CreateMirrorForSelect {
qrep_flow_job: QRepFlowJob,
},
ExecuteMirrorForSelect {
flow_job_name: String,
},
DropMirror {
if_exists: bool,
flow_job_name: String,
Expand Down Expand Up @@ -166,6 +169,15 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
raw_options.insert(&option.name.value as &str, &option.value);
}

// we treat disabled as a special option, and do not pass it to the
// flow server, this is primarily used for external orchestration.
let mut disabled = false;
if let Some(sqlparser::ast::Value::Boolean(b)) =
raw_options.remove("disabled")
{
disabled = *b;
}

let processed_options = process_options(raw_options)?;

let qrep_flow_job = QRepFlowJob {
Expand All @@ -175,12 +187,16 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
query_string: select.query_string.to_string(),
flow_options: processed_options,
description: "".to_string(), // TODO: add description
disabled,
};

Ok(Some(PeerDDL::CreateMirrorForSelect { qrep_flow_job }))
}
}
}
Statement::ExecuteMirror { mirror_name } => Ok(Some(PeerDDL::ExecuteMirrorForSelect {
flow_job_name: mirror_name.to_string().to_lowercase(),
})),
Statement::DropMirror {
if_exists,
mirror_name,
Expand Down
30 changes: 30 additions & 0 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,36 @@ impl Catalog {
Ok(())
}

pub async fn get_qrep_flow_job_by_name(
&self,
job_name: &str,
) -> anyhow::Result<Option<QRepFlowJob>> {
let stmt = self
.pg
.prepare_typed("SELECT f.*, sp.name as source_peer_name, dp.name as destination_peer_name FROM flows as f
INNER JOIN peers as sp ON f.source_peer = sp.id
INNER JOIN peers as dp ON f.destination_peer = dp.id
WHERE f.name = $1", &[types::Type::TEXT])
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
QRepFlowJob {
name: row.get("name"),
source_peer: row.get("source_peer_name"),
target_peer: row.get("destination_peer_name"),
description: row.get("description"),
query_string: row.get("query_string"),
flow_options: serde_json::from_value(row.get("flow_metadata"))
.context("unable to deserialize flow options")
.unwrap_or_default(),
// we set the disabled flag to false by default
disabled: false,
}
});

Ok(job)
}

pub async fn create_qrep_flow_job_entry(&self, job: &QRepFlowJob) -> anyhow::Result<()> {
let source_peer_id = self
.get_peer_id_i32(&job.source_peer)
Expand Down
1 change: 1 addition & 0 deletions nexus/pt/src/flow_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ pub struct QRepFlowJob {
pub query_string: String,
pub flow_options: HashMap<String, Value>,
pub description: String,
pub disabled: bool,
}
Loading