-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[2/n][structured config] Enable struct config resources, IO managers to depend on other resources #11645
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 2 Ignored Deployments
|
563d10e
to
3051156
Compare
3051156
to
6fbf9dc
Compare
6fbf9dc
to
096796c
Compare
096796c
to
6348623
Compare
672d984
to
50e613c
Compare
6348623
to
a8fa3c1
Compare
50e613c
to
c2c6500
Compare
04934d1
to
a95f683
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm definitely scared of this one. It is fiendishly clever, but I'm concerned about maintainability over time.
What alternatives did we consider? Is there an approach available to use where we use a type annotation for resource-to-resource dependencies.
class MyResource:
inner_resource: DependsOnResource[resource_key, InnerResource]
Stepping back I think this is type of situation where we should explore imposing more burden on users here in order to simplify the implementation and make the behavior more predictable.
@dataclass_transform() | ||
class BaseResourceMeta(pydantic.main.ModelMetaclass): | ||
def __new__(self, name, bases, namespaces, **kwargs): | ||
annotations = namespaces.get("__annotations__", {}) | ||
for base in bases: | ||
if hasattr(base, "__annotations__"): | ||
annotations.update(base.__annotations__) | ||
for field in annotations: | ||
if not field.startswith("__"): | ||
if get_origin(annotations[field]) == _ResourceDep: | ||
arg = get_args(annotations[field])[0] | ||
annotations[field] = Union[_PartialResource[arg], _Resource[arg]] | ||
elif _safe_is_subclass(annotations[field], _Resource): | ||
base = annotations[field] | ||
annotations[field] = Union[_PartialResource[base], base] | ||
|
||
namespaces["__annotations__"] = annotations | ||
return super().__new__(self, name, bases, namespaces, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain, in prose, what this witchcraft is doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a user defines a class extending Resource
, Pydantic will use the type annotations to build a schema of valid values for each field.
For example, if the user defines the resources:
class MyCredentials(Resource):
username: str
password: str
class MyResource(Resource):
creds: MyCredentials
Then the schema requires creds
to be an instance MyCredentials
when MyResource
is constructed. However, we would like to enable a user to specify a runtime-configured resource, which is of the type Partial[MyCredentials]
.
To do so, we want to redefine the user's type, behind the scenes, to:
class MyResource(Resource):
creds: MyCredentials | PartialResource[MyCredentials]
This is what this metaclass does - when the user defines a class extending Resource
, it iterates over all the fields and remaps any Resource
-annotated field with a union of Resource
and PartialResource
.
(I've added some more comments to describe this inline)
python_modules/dagster/dagster_tests/core_tests/resource_tests/test_struct_config_resources.py
Outdated
Show resolved
Hide resolved
e13da59
to
e8fac79
Compare
9e595b2
to
c8fcefe
Compare
d34ab46
to
886dfd9
Compare
c8fcefe
to
ebd0178
Compare
1638875
to
1b4d8e3
Compare
) ## Summary Alternate approach stacked on #11645 which removes the `top_level_key` shenanigans. The goal of this PR is to allow for the following pattern: ```python foo = FooResource.configure_at_runtime() bar = BarResource(inner=foo) defs = Definitions( ... resources={ "my_foo": foo, "my_bar": bar, } ) ``` Here, `bar` has a pointer to a `ResourceDefinition` for `foo`, but has no knowledge that `foo` is tied to the resource key `"my_foo"` (this is only specified at `Definitions` time). However, any pipeline which depends on `"my_bar"` as a required resource key must also depend on `"my_foo"`, so `bar.required_resource_keys` must return `{"my_foo"}`. Getting this information (that `foo` has `"my_foo"` as a resource key) to `required_resource_keys` is tricky. This PR adds a new wrapper `ResourceDefinition` called `ResourceWithKeyMapping` which pairs the wrapped resource with mapping that takes `ResourceDefinition` IDs to resource keys (e.g. allowing us to map `foo`->`"my_foo"`). It catches any calls to `required_resource_keys` and instead forwards them to `_resolve_required_resource_keys`, which takes the mapping. ## Test Plan Existing unit tests.
0987feb
to
74a648c
Compare
Merged #12232 into this PR, avoiding the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks mostly good, except for the string mangling and jamming dynamically named properties on the resources object. Let's do something better than that.
python_modules/dagster/dagster/_config/structured_config/__init__.py
Outdated
Show resolved
Hide resolved
def _get_resource_by_id(context: InitResourceContext, resource_id: ResourceId) -> Any: | ||
""" | ||
Retrieves a resource's value from the resource context object from the ResourceDefinition's ID. | ||
This is used to retrieve the value of nested resources which are not fully configured, since the | ||
resource key is not known by the nested resource's parent. | ||
""" | ||
return getattr(context.resources, f"id_{resource_id}", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should model this with a data structure somewhere, not mangle strings and monkeypatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Attached to a new InitResourceContext
subclass, InitResourceContextWithKeyMapping
, which includes the id-to-resource mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last turn.
We definitely don't want to slow down the core tests case suite this much with those mypy and pyright tests.
python_modules/dagster/dagster/_config/structured_config/__init__.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster/_config/structured_config/__init__.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster/_config/structured_config/__init__.py
Outdated
Show resolved
Hide resolved
def test_type_signatures_config_at_launch(): | ||
with tempfile.TemporaryDirectory() as tempdir: | ||
filename = os.path.join(tempdir, "test.py") | ||
|
||
with open(filename, "w") as f: | ||
f.write( | ||
""" | ||
from dagster._config.structured_config import Resource | ||
|
||
class MyResource(Resource): | ||
a_string: str | ||
|
||
reveal_type(MyResource.configure_at_launch()) | ||
""" | ||
) | ||
|
||
pyright_out = get_pyright_reveal_type_output(filename) | ||
mypy_out = get_mypy_type_output(filename) | ||
|
||
# Ensure partial resource is correctly parameterized | ||
assert pyright_out[0] == "PartialResource[MyResource]" | ||
assert mypy_out[0].endswith("PartialResource[test.MyResource]") | ||
|
||
|
||
def test_type_signatures_constructor_resource_dependency(): | ||
with tempfile.TemporaryDirectory() as tempdir: | ||
filename = os.path.join(tempdir, "test.py") | ||
|
||
with open(filename, "w") as f: | ||
f.write( | ||
""" | ||
from dagster._config.structured_config import Resource, ResourceDependency | ||
|
||
class StringDependentResource(Resource): | ||
a_string: ResourceDependency[str] | ||
|
||
reveal_type(StringDependentResource.__init__) | ||
|
||
my_str_resource = StringDependentResource(a_string="foo") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are supppper slow. I think we should disable them by default locally and additionally, run them in their own test suite to keep the core_test
time reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gated behind -m 'typesignature'
mark. They'll run separately in CI now & only run locally with that flag.
import pydantic | ||
from typing_extensions import dataclass_transform, get_origin | ||
|
||
from .utils import safe_is_subclass | ||
|
||
if TYPE_CHECKING: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worthy of comment/explanation as to why this works while still having to do all the crazy shenanigans around temp classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Also I think you should do something like #12273 to hide all the temp class stuff in a single scope. Feels a little java-y, but those top-level symbols aren't great. |
beaf683
to
46be9e3
Compare
4c7dfda
to
5fc053b
Compare
… depend on other resources
5ef268e
to
45d2669
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok lfg
python_modules/dagster/dagster/_config/structured_config/__init__.py
Outdated
Show resolved
Hide resolved
@@ -250,6 +506,23 @@ def resource_fn(context: InitResourceContext): | |||
|
|||
|
|||
ResourceOrPartial: TypeAlias = Union[Resource[ResValue], PartialResource[ResValue]] | |||
ResourceOrPartialOrBase: TypeAlias = Union[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or base? base what?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to OrValue
, since this can also hold the raw resource value (in the case of a bare Python object)
Summary
Allows resources to depend on other resources:
When using raw-Python outputs, these must be categorized as resources using the
ResourceDependency
annotation:Implementation
The goal of this PR is to allow for the following pattern:
Here,
bar
has a pointer to aResourceDefinition
forfoo
, but has no knowledge thatfoo
is tied to the resource key"my_foo"
(this is only specified atDefinitions
time). However, any pipeline which depends on"my_bar"
as a required resource key must also depend on"my_foo"
, sobar.required_resource_keys
must return{"my_foo"}
.Getting this information (that
foo
has"my_foo"
as a resource key) torequired_resource_keys
is tricky.This PR adds a new wrapper
ResourceDefinition
calledResourceWithKeyMapping
which pairs the wrapped resource with mapping that takesResourceDefinition
IDs to resource keys (e.g. allowing us to mapfoo
->"my_foo"
). It catches any calls torequired_resource_keys
and instead forwards them to_resolve_required_resource_keys
, which takes the mapping.See #12232 for more context.
Test Plan
Added unit tests for I/O managers, resources.
Test Plan