-
ContextI’m working on a pipeline where I need to load an external As each node processes its respective inputs, the external assets (like the geospatial data) will be passed between nodes as needed. Finally, the last node in the pipeline will handle the materialization of the final output How would you approach structuring this in Dagster? Is this even possible? ApproachBased on: https://www.youtube.com/watch?v=KVqyarPbCeU&t=795s I want to use the # Define the burnable area as an external asset
burnable_4km_asset = AssetSpec(
key=AssetKey(
[EnvVar("BUCKET").get_value(), "wildfire", "burnable_4km"]
),
metadata={
"aws_account": EnvVar("AWS_ACCOUNT").get_value(),
"s3_location": f"s3://{EnvVar('BUCKET').get_value()}"
+ "burnable_area.tif",
},
description="Wildfire location data of burnable 4km",
group_name="external_assets",
)
@op
def read_burnable_area(
context: OpExecutionContext,
aws_session_resource: AWSSessionResource,
burnable_4km # I believe this should be resolved by dagster
) -> xr.DataArray:
s3_path = context.upstream_output.metadata["s3_location"]
context.log.info(
f"Reading burnable area from: {s3_path} for lat: , lon: "
)
# computation logic
# graph
@graph
def wildfire_damage_calculation():
return read_burnable_area()
# definitions
defs = Definitions(
assets=burnable_4km_asset,
resources={
"aws_session_resource": AWSSessionResource(
aws_access_key_id=EnvVar("aws_access_key_id"),
aws_secret_access_key="aws_secret_acess_key",
region_name="aws_region",
),
},
jobs=[wildfire_damage_calculation.to_job()],
) IssueIn Dagster UI I'm seeing the below error. What am I missing?
|
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
I have found the answer here: #17718 |
Beta Was this translation helpful? Give feedback.
I have found the answer here: #17718