Skip to content

Commit

Permalink
[structured config] Allow structured config resources, IO managers to…
Browse files Browse the repository at this point in the history
… depend on other resources
  • Loading branch information
benpankow committed Jan 20, 2023
1 parent c2c6500 commit 80099ee
Show file tree
Hide file tree
Showing 4 changed files with 598 additions and 24 deletions.
230 changes: 207 additions & 23 deletions python_modules/dagster/dagster/_config/structured_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import inspect
from typing import Generic, TypeVar, Union
from typing import Generic, Mapping, TypeVar, Union, get_origin

from typing_extensions import TypeAlias
from typing_extensions import TypeAlias, dataclass_transform, get_args

from dagster._config.config_type import ConfigType
from dagster._config.source import BoolSource, IntSource, StringSource
Expand All @@ -17,7 +17,7 @@ class cached_property: # type: ignore[no-redef]


from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Type, cast
from typing import AbstractSet, Any, Dict, Optional, Set, Tuple, Type, cast

from pydantic import BaseModel, Extra
from pydantic.fields import SHAPE_SINGLETON, ModelField
Expand All @@ -31,7 +31,10 @@ class cached_property: # type: ignore[no-redef]
convert_potential_field,
)
from dagster._core.definitions.resource_definition import ResourceDefinition, ResourceFunction
from dagster._core.storage.io_manager import IOManager, IOManagerDefinition
from dagster._core.storage.io_manager import (
IOManager,
IOManagerDefinition,
)

Self = Any

Expand Down Expand Up @@ -114,11 +117,92 @@ def _curry_config_schema(schema_field: Field, data: Any) -> IDefinitionConfigSch

T = TypeVar("T")

import pydantic


# Since a metaclass is invoked by Resource before Resource or PartialResource is defined, we need to
# define a temporary class to use as a placeholder for use in the initial metaclass invocation.
# When the metaclass is invoked for a Resource subclass, it will use the non-placeholder values.
class _Temp(Generic[T]):
pass


_ResourceDep: Type = _Temp
_Resource: Type = _Temp
_PartialResource: Type = _Temp


@dataclass_transform()
class BaseResourceMeta(pydantic.main.ModelMetaclass):
def __new__(self, name, bases, namespaces, **kwargs):
annotations = namespaces.get("__annotations__", {})
for base in bases:
if hasattr(base, "__annotations__"):
annotations.update(base.__annotations__)
for field in annotations:
if not field.startswith("__"):
if get_origin(annotations[field]) == _ResourceDep:
arg = get_args(annotations[field])[0]
annotations[field] = Union[_PartialResource[arg], _Resource[arg]]
elif _safe_is_subclass(annotations[field], _Resource):
base = annotations[field]
annotations[field] = Union[_PartialResource[base], base]

namespaces["__annotations__"] = annotations
return super().__new__(self, name, bases, namespaces, **kwargs)


class AllowDelayedDependencies:
_top_level_key: Optional[str] = None
_resource_pointers: Mapping[str, "AllowDelayedDependencies"] = {}

def set_top_level_key(self, key: str):
"""
Sets the top-level resource key for this resource, when passed
into the Definitions object.
"""
self._top_level_key = key

def get_top_level_key(self) -> Optional[str]:
"""
Gets the top-level resource key for this resource which was associated with it
in the Definitions object.
"""
return self._top_level_key

def _resolve_required_resource_keys(self) -> AbstractSet[str]:
# All dependent resources which are not fully configured
# must be specified to the Definitions object so that the
# resource can be configured at runtime by the user
pointer_keys = {k: v.get_top_level_key() for k, v in self._resource_pointers.items()}
check.invariant(
all(pointer_key is not None for pointer_key in pointer_keys.values()),
(
"Any partially configured, nested resources must be specified to Definitions"
f" object: {pointer_keys}"
),
)

# Recursively get all nested resource keys
nested_pointer_keys: Set[str] = set()
for v in self._resource_pointers.values():
nested_pointer_keys.update(v._resolve_required_resource_keys())

resources, _ = _separate_resource_params(self.__dict__)
resources = {k: v for k, v in resources.items() if isinstance(v, Resource)}
for v in resources.values():
nested_pointer_keys.update(v._resolve_required_resource_keys())

out = set(cast(Set[str], pointer_keys.values())).union(nested_pointer_keys)
return out


class Resource(
Generic[T],
ResourceDefinition,
Config,
AllowDelayedDependencies,
metaclass=BaseResourceMeta,
):
"""
Base class for Dagster resources that utilize structured config.
Expand All @@ -138,16 +222,72 @@ def output(self, text: str) -> None:
"""

def __init__(self, **data: Any):
schema = infer_schema_from_config_class(self.__class__)
resource_pointers, data_without_resources = _separate_resource_params(data)

schema = infer_schema_from_config_class(
self.__class__, ignore_resource_fields=set(resource_pointers.keys())
)

schema = _curry_config_schema(schema, data_without_resources)

Config.__init__(self, **data)

# We keep track of any resources we depend on which are not fully configured
# so that we can retrieve them at runtime
self._resource_pointers: Mapping[str, PartialResource] = {
k: v for k, v in resource_pointers.items() if isinstance(v, PartialResource)
}

ResourceDefinition.__init__(
self,
resource_fn=self.create_object_to_pass_to_user_code,
config_schema=_curry_config_schema(schema, data),
resource_fn=self.initialize_and_run,
config_schema=schema,
description=self.__doc__,
)

def create_object_to_pass_to_user_code(self, context) -> Any: # pylint: disable=unused-argument
@classmethod
def partial(cls, **kwargs) -> "PartialResource[Self]":
"""
Returns a partially initialized copy of the resource, with remaining config fields
set at runtime.
"""
return PartialResource(cls, data=kwargs)

@property
def required_resource_keys(self) -> AbstractSet[str]:
return self._resolve_required_resource_keys()

def initialize_and_run(self, context: InitResourceContext) -> T:
# If we have any partially configured resources, we need to update them
# with the fully configured resources from the context

_, config_to_update = _separate_resource_params(context.resource_config)

resources_to_update, _ = _separate_resource_params(self.__dict__)
resources_to_update = {
k: v.initialize_and_run(context)
for k, v in resources_to_update.items()
if isinstance(v, Resource)
}

partial_resources_to_update = {
k: getattr(context.resources, cast(str, v.get_top_level_key()))
for k, v in self._resource_pointers.items()
}

to_update = {**resources_to_update, **partial_resources_to_update, **config_to_update}

for k, v in to_update.items():
object.__setattr__(self, k, v)

return self._create_object_fn(context)

def _create_object_fn(self, context: InitResourceContext) -> T:
return self.create_object_to_pass_to_user_code(context)

def create_object_to_pass_to_user_code(
self, context: InitResourceContext
) -> T: # pylint: disable=unused-argument
"""
Returns the object that this resource hands to user code, accessible by ops or assets
through the context or resource parameters. This works like the function decorated
Expand All @@ -158,14 +298,6 @@ def create_object_to_pass_to_user_code(self, context) -> Any: # pylint: disable
"""
return cast(T, self)

@classmethod
def partial(cls, **kwargs) -> "PartialResource[Self]":
"""
Returns a partially initialized copy of the resource, with remaining config fields
set at runtime.
"""
return PartialResource(cls, data=kwargs)

# Python descriptor
# https://docs.python.org/3/howto/descriptor.html
# Used to adjust the types of resource inputs and outputs, e.g. resource dependencies can be passed in
Expand All @@ -177,24 +309,36 @@ def __set__(self, obj: Optional[object], value: Union[T, "PartialResource[T]"])
...


class PartialResource(Generic[T], ResourceDefinition, MakeConfigCacheable):
class PartialResource(
Generic[T], ResourceDefinition, AllowDelayedDependencies, MakeConfigCacheable
):
data: Dict[str, Any]
resource_cls: Type[Resource[T]]

def __init__(self, resource_cls: Type[Resource[T]], data: Dict[str, Any]):
check.invariant(data == {}, "PartialResource currently does not support config fields")
resource_pointers, data_without_resources = _separate_resource_params(data)

Config.__init__(self, data=data, resource_cls=resource_cls)
check.invariant(
data_without_resources == {}, "PartialResource currently does not support config fields"
)

Config.__init__(self, data=data_without_resources, resource_cls=resource_cls)

MakeConfigCacheable.__init__(self, data=data, resource_cls=resource_cls)

# We keep track of any resources we depend on which are not fully configured
# so that we can retrieve them at runtime
self._resource_pointers: Dict[str, ResourceOrPartial] = {
k: v for k, v in resource_pointers.items() if isinstance(v, PartialResource)
}

schema = infer_schema_from_config_class(
resource_cls,
resource_cls, ignore_resource_fields=set(resource_pointers.keys())
)

def resource_fn(context: InitResourceContext):
instantiated = resource_cls(**context.resource_config, **data)
return instantiated.create_object_to_pass_to_user_code(context)
return instantiated.initialize_and_run(context)

ResourceDefinition.__init__(
self,
Expand All @@ -203,10 +347,28 @@ def resource_fn(context: InitResourceContext):
description=resource_cls.__doc__,
)

@property
def required_resource_keys(self) -> AbstractSet[str]:
return self._resolve_required_resource_keys()


ResourceOrPartial: TypeAlias = Union[Resource[T], PartialResource[T]]


V = TypeVar("V")


class ResourceDependency(Generic[V]):
def __set_name__(self, _owner, name):
self._name = name

def __get__(self, obj: "Resource", __owner: Any) -> V:
return getattr(obj, self._name)

def __set__(self, obj: Optional[object], value: ResourceOrPartial[V]) -> None:
setattr(obj, self._name, value)


class StructuredResourceAdapter(Resource, ABC):
"""
Adapter base class for wrapping a decorated, function-style resource
Expand Down Expand Up @@ -263,7 +425,7 @@ def __init__(self, **data: Any):
Resource.__init__(self, **data)
IOManagerDefinition.__init__(
self,
resource_fn=self.create_io_manager_to_pass_to_user_code,
resource_fn=self.initialize_and_run,
config_schema=self._config_schema,
description=self.__doc__,
)
Expand Down Expand Up @@ -400,20 +562,42 @@ def infer_schema_from_config_annotation(model_cls: Any, config_arg_default: Any)
def infer_schema_from_config_class(
model_cls: Type[Config],
description: Optional[str] = None,
ignore_resource_fields: Optional[Set[str]] = None,
) -> Field:
"""
Parses a structured config class and returns a corresponding Dagster config Field.
"""
ignore_resource_fields = ignore_resource_fields or set()

check.param_invariant(
issubclass(model_cls, Config),
"Config type annotation must inherit from dagster._config.structured_config.Config",
)

fields = {}
for pydantic_field in model_cls.__fields__.values():
fields[pydantic_field.alias] = _convert_pydantic_field(pydantic_field)
if pydantic_field.name not in ignore_resource_fields:
fields[pydantic_field.alias] = _convert_pydantic_field(pydantic_field)

shape_cls = Permissive if model_cls.__config__.extra == Extra.allow else Shape

docstring = model_cls.__doc__.strip() if model_cls.__doc__ else None
return Field(config=shape_cls(fields), description=description or docstring)


def _separate_resource_params(
data: Dict[str, Any]
) -> Tuple[Dict[str, Union[Resource, PartialResource]], Dict[str, Any]]:
"""
Separates out the key/value inputs of fields in a structured config Resource class which
are themselves Resources and those which are not.
"""
return (
{k: v for k, v in data.items() if isinstance(v, (Resource, PartialResource))},
{k: v for k, v in data.items() if not isinstance(v, (Resource, PartialResource))},
)


_Resource = Resource
_PartialResource = PartialResource
_ResourceDep = ResourceDependency
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ def _create_repository_using_definitions_args(

if loggers:
check.mapping_param(loggers, "loggers", key_type=str, value_type=LoggerDefinition)
from dagster._config.structured_config import AllowDelayedDependencies

if resources:
for rkey, resource in resources.items():
if isinstance(resource, AllowDelayedDependencies):
resource.set_top_level_key(rkey)

resource_defs = wrap_resources_for_execution(resources or {})

Expand Down
Loading

0 comments on commit 80099ee

Please sign in to comment.