Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/n][structured config] Enable struct config resources, IO managers to depend on other resources #11645

Merged
merged 11 commits into from
Feb 15, 2023

Conversation

benpankow
Copy link
Member

@benpankow benpankow commented Jan 11, 2023

Summary

Allows resources to depend on other resources:

class AWSCredentials(Resource):
    username: str
    password: str

class S3Resource(Resource):
    credentials: AWSCredentials
    region: str
    bucket: str

class EC2Resource(Resource):
    credentials: AWSCredentials

credentials = AWSCredentials(
    username=EnvVar("AWS_USERNAME"),
    password=EnvVar("AWS_PASSWORD"),
)

s3 = S3Resource(credentials=credentials)
ec2 = EC2Resource(credentials=credentials)

defs = Definitions(
    assets=[...],
    resources={
        "aws_credentials": credentials,
        "s3": s3,
        "ec2": snowflake,
    }
)

When using raw-Python outputs, these must be categorized as resources using the ResourceDependency annotation:

@resource(config_schema={"username": str, "password": str})
def postgres_engine(context):
    return sqlalchemy.create_engine(...)

class DBResource:
    engine: ResourceOutput[sqlalchemy.engine.Engine]

engine = postgres_engine.configured({"username": "foo", "password": "bar"})

# engine input is typed to take any of
# sqlalchemy.engine.Engine, Resource[sqlalchemy.engine.Engine],
# PartialResource[sqlalchemy.engine.Engine],
# or ResourceDefintion (for old, function-based resources)
db = DBResource(engine=engine)

Implementation

The goal of this PR is to allow for the following pattern:

foo = FooResource.configure_at_runtime()
bar = BarResource(inner=foo)

defs = Definitions(
    ...
    resources={
        "my_foo": foo,
        "my_bar": bar,
    }
)

Here, bar has a pointer to a ResourceDefinition for foo, but has no knowledge that foo is tied to the resource key "my_foo" (this is only specified at Definitions time). However, any pipeline which depends on "my_bar" as a required resource key must also depend on "my_foo", so bar.required_resource_keys must return {"my_foo"}.

Getting this information (that foo has "my_foo" as a resource key) to required_resource_keys is tricky.

This PR adds a new wrapper ResourceDefinition called ResourceWithKeyMapping which pairs the wrapped resource with mapping that takes ResourceDefinition IDs to resource keys (e.g. allowing us to map foo->"my_foo"). It catches any calls to required_resource_keys and instead forwards them to _resolve_required_resource_keys, which takes the mapping.

See #12232 for more context.

Test Plan

Added unit tests for I/O managers, resources.

Test Plan

@vercel
Copy link

vercel bot commented Jan 11, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

2 Ignored Deployments
Name Status Preview Comments Updated
dagit-storybook ⬜️ Ignored (Inspect) Feb 15, 2023 at 9:13PM (UTC)
dagster ⬜️ Ignored (Inspect) Visit Preview Feb 15, 2023 at 9:13PM (UTC)

@benpankow benpankow force-pushed the benpankow/wip-partial-resources branch from 563d10e to 3051156 Compare January 18, 2023 00:28
@benpankow benpankow force-pushed the benpankow/wip-partial-resources branch from 3051156 to 6fbf9dc Compare January 18, 2023 18:46
@benpankow benpankow changed the base branch from master to benpankow/wip-partial-resources-0.5 January 18, 2023 19:05
@benpankow benpankow force-pushed the benpankow/wip-partial-resources branch from 6fbf9dc to 096796c Compare January 18, 2023 19:05
@benpankow benpankow changed the title [structured config][wip] resource-resource deps w/ partial resources [structured config] Enable struct config resources, IO managers to depend on other resources Jan 18, 2023
@benpankow benpankow marked this pull request as ready for review January 18, 2023 19:13
@benpankow benpankow force-pushed the benpankow/wip-partial-resources branch from 096796c to 6348623 Compare January 18, 2023 19:14
@benpankow benpankow force-pushed the benpankow/wip-partial-resources-0.5 branch from 672d984 to 50e613c Compare January 18, 2023 23:20
@benpankow benpankow force-pushed the benpankow/wip-partial-resources branch from 6348623 to a8fa3c1 Compare January 18, 2023 23:20
@benpankow benpankow force-pushed the benpankow/wip-partial-resources-0.5 branch from 50e613c to c2c6500 Compare January 20, 2023 18:04
@benpankow benpankow force-pushed the benpankow/wip-partial-resources branch from 04934d1 to a95f683 Compare January 20, 2023 18:05
Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm definitely scared of this one. It is fiendishly clever, but I'm concerned about maintainability over time.

What alternatives did we consider? Is there an approach available to use where we use a type annotation for resource-to-resource dependencies.

class MyResource:
   inner_resource: DependsOnResource[resource_key, InnerResource]

Stepping back I think this is type of situation where we should explore imposing more burden on users here in order to simplify the implementation and make the behavior more predictable.

Comment on lines 132 to 172
@dataclass_transform()
class BaseResourceMeta(pydantic.main.ModelMetaclass):
def __new__(self, name, bases, namespaces, **kwargs):
annotations = namespaces.get("__annotations__", {})
for base in bases:
if hasattr(base, "__annotations__"):
annotations.update(base.__annotations__)
for field in annotations:
if not field.startswith("__"):
if get_origin(annotations[field]) == _ResourceDep:
arg = get_args(annotations[field])[0]
annotations[field] = Union[_PartialResource[arg], _Resource[arg]]
elif _safe_is_subclass(annotations[field], _Resource):
base = annotations[field]
annotations[field] = Union[_PartialResource[base], base]

namespaces["__annotations__"] = annotations
return super().__new__(self, name, bases, namespaces, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain, in prose, what this witchcraft is doing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a user defines a class extending Resource, Pydantic will use the type annotations to build a schema of valid values for each field.

For example, if the user defines the resources:

class MyCredentials(Resource):
    username: str
    password: str

class MyResource(Resource):
    creds: MyCredentials

Then the schema requires creds to be an instance MyCredentials when MyResource is constructed. However, we would like to enable a user to specify a runtime-configured resource, which is of the type Partial[MyCredentials].

To do so, we want to redefine the user's type, behind the scenes, to:

class MyResource(Resource):
    creds: MyCredentials | PartialResource[MyCredentials]

This is what this metaclass does - when the user defines a class extending Resource, it iterates over all the fields and remaps any Resource-annotated field with a union of Resource and PartialResource.

(I've added some more comments to describe this inline)

benpankow added a commit that referenced this pull request Feb 10, 2023
)

## Summary

Alternate approach stacked on #11645 which removes the `top_level_key`
shenanigans.

The goal of this PR is to allow for the following pattern:

```python
foo = FooResource.configure_at_runtime()
bar = BarResource(inner=foo)

defs = Definitions(
    ...
    resources={
        "my_foo": foo,
        "my_bar": bar,
    }
)
```

Here, `bar` has a pointer to a `ResourceDefinition` for `foo`, but has
no knowledge that `foo` is tied to the resource key `"my_foo"` (this is
only specified at `Definitions` time). However, any pipeline which
depends on `"my_bar"` as a required resource key must also depend on
`"my_foo"`, so `bar.required_resource_keys` must return `{"my_foo"}`.

Getting this information (that `foo` has `"my_foo"` as a resource key)
to `required_resource_keys` is tricky.

This PR adds a new wrapper `ResourceDefinition` called
`ResourceWithKeyMapping` which pairs the wrapped resource with mapping
that takes `ResourceDefinition` IDs to resource keys (e.g. allowing us
to map `foo`->`"my_foo"`). It catches any calls to
`required_resource_keys` and instead forwards them to
`_resolve_required_resource_keys`, which takes the mapping.

## Test Plan

Existing unit tests.
@benpankow benpankow force-pushed the benpankow/wip-partial-resources branch from 0987feb to 74a648c Compare February 10, 2023 17:52
@benpankow
Copy link
Member Author

Merged #12232 into this PR, avoiding the top_level_keys approach.

Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks mostly good, except for the string mangling and jamming dynamically named properties on the resources object. Let's do something better than that.

Comment on lines 315 to 321
def _get_resource_by_id(context: InitResourceContext, resource_id: ResourceId) -> Any:
"""
Retrieves a resource's value from the resource context object from the ResourceDefinition's ID.
This is used to retrieve the value of nested resources which are not fully configured, since the
resource key is not known by the nested resource's parent.
"""
return getattr(context.resources, f"id_{resource_id}", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should model this with a data structure somewhere, not mangle strings and monkeypatch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attached to a new InitResourceContext subclass, InitResourceContextWithKeyMapping, which includes the id-to-resource mapping.

Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last turn.

We definitely don't want to slow down the core tests case suite this much with those mypy and pyright tests.

Comment on lines 963 to 1006
def test_type_signatures_config_at_launch():
with tempfile.TemporaryDirectory() as tempdir:
filename = os.path.join(tempdir, "test.py")

with open(filename, "w") as f:
f.write(
"""
from dagster._config.structured_config import Resource

class MyResource(Resource):
a_string: str

reveal_type(MyResource.configure_at_launch())
"""
)

pyright_out = get_pyright_reveal_type_output(filename)
mypy_out = get_mypy_type_output(filename)

# Ensure partial resource is correctly parameterized
assert pyright_out[0] == "PartialResource[MyResource]"
assert mypy_out[0].endswith("PartialResource[test.MyResource]")


def test_type_signatures_constructor_resource_dependency():
with tempfile.TemporaryDirectory() as tempdir:
filename = os.path.join(tempdir, "test.py")

with open(filename, "w") as f:
f.write(
"""
from dagster._config.structured_config import Resource, ResourceDependency

class StringDependentResource(Resource):
a_string: ResourceDependency[str]

reveal_type(StringDependentResource.__init__)

my_str_resource = StringDependentResource(a_string="foo")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are supppper slow. I think we should disable them by default locally and additionally, run them in their own test suite to keep the core_test time reasonable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gated behind -m 'typesignature' mark. They'll run separately in CI now & only run locally with that flag.

import pydantic
from typing_extensions import dataclass_transform, get_origin

from .utils import safe_is_subclass

if TYPE_CHECKING:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worthy of comment/explanation as to why this works while still having to do all the crazy shenanigans around temp classes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@schrockn
Copy link
Member

Also I think you should do something like #12273 to hide all the temp class stuff in a single scope. Feels a little java-y, but those top-level symbols aren't great.

@benpankow benpankow force-pushed the benpankow/wip-partial-resources branch from 5ef268e to 45d2669 Compare February 14, 2023 21:33
Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok lfg

@@ -250,6 +506,23 @@ def resource_fn(context: InitResourceContext):


ResourceOrPartial: TypeAlias = Union[Resource[ResValue], PartialResource[ResValue]]
ResourceOrPartialOrBase: TypeAlias = Union[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or base? base what?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to OrValue, since this can also hold the raw resource value (in the case of a bare Python object)

@benpankow benpankow merged commit 0d62593 into master Feb 15, 2023
@benpankow benpankow deleted the benpankow/wip-partial-resources branch February 15, 2023 21:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants