Skip to content

Commit

Permalink
Challenge edits (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
dehume authored Dec 20, 2022
1 parent 4f14f6f commit 08ce7ca
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 9 deletions.
2 changes: 1 addition & 1 deletion week_2/workspaces/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def uri(self):
return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}"

def execute_query(self, query: str):
self._engine.execute(query)
return self._engine.execute(query)


class S3:
Expand Down
2 changes: 1 addition & 1 deletion week_3/workspaces/challenge/week_3_challenge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def load_input(self, context):
pass


@io_manager(required_resource_keys={"postgres"})
@io_manager(required_resource_keys={"database"})
def postgres_io_manager(init_context):
return PostgresIOManager()

Expand Down
2 changes: 1 addition & 1 deletion week_3/workspaces/project/week_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def week_3_pipeline():
"bucket": "dagster",
"access_key": "test",
"secret_key": "test",
"endpoint_url": "http://localstack:4566",
"endpoint_url": "http://localhost:4566",
}
},
"redis": {
Expand Down
2 changes: 1 addition & 1 deletion week_3/workspaces/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def uri(self):
return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}"

def execute_query(self, query: str):
self._engine.execute(query)
return self._engine.execute(query)


class S3:
Expand Down
4 changes: 2 additions & 2 deletions week_4/workspaces/challenge/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
create_dbt_table,
dbt_assets,
end,
insert_dbt_data,
dbt_table,
)
from workspaces.dbt_config import DBT_PROJECT_PATH
from workspaces.resources import postgres_resource
Expand All @@ -13,7 +13,7 @@
@repository
def repo():
return with_resources(
dbt_assets + [create_dbt_table, insert_dbt_data, end],
dbt_assets + [create_dbt_table, dbt_table, end],
resource_defs={
"dbt": dbt_cli_resource.configured(
{
Expand Down
9 changes: 8 additions & 1 deletion week_4/workspaces/challenge/week_4_challenge.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
@asset(
required_resource_keys={"database"},
op_tags={"kind": "postgres"},
key_prefix=["postgresql"],
)
def create_dbt_table(context):
sql = "CREATE SCHEMA IF NOT EXISTS analytics;"
Expand All @@ -21,8 +22,9 @@ def create_dbt_table(context):
@asset(
required_resource_keys={"database"},
op_tags={"kind": "postgres"},
key_prefix=["postgresql"],
)
def insert_dbt_data(context, create_dbt_table):
def dbt_table(context, create_dbt_table):
sql = f"INSERT INTO {SOURCE_TABLE} (column_1, column_2, column_3) VALUES ('A', 'B', 'C');"

number_of_rows = randint(1, 10)
Expand All @@ -31,3 +33,8 @@ def insert_dbt_data(context, create_dbt_table):
context.log.info("Inserted a row")

context.log.info("Batch inserted")


@asset
def end(context):
pass
2 changes: 1 addition & 1 deletion week_4/workspaces/dbt_config.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DBT_PROJECT_PATH = "/opt/dagster/dagster_home/project/dbt_test_project"
DBT_PROJECT_PATH = "/opt/dagster/dagster_home/dbt_test_project"
2 changes: 1 addition & 1 deletion week_4/workspaces/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def uri(self):
return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}"

def execute_query(self, query: str):
self._engine.execute(query)
return self._engine.execute(query)


class S3:
Expand Down

0 comments on commit 08ce7ca

Please sign in to comment.