Skip to content

Commit

Permalink
ruff mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Jan 18, 2023
1 parent 3736a4c commit c50fdb0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from typing import Union

import pandas
import pyspark
from pyspark.sql import DataFrame as PySparkDataFrame
from dagster import (
InputContext,
OutputContext,
Expand All @@ -12,6 +10,7 @@
from dagster._config.structured_config import ResourceDependency, StructuredConfigIOManager
from dagster._seven.temp_dir import get_system_temp_directory
from dagster_pyspark.resources import PySparkResource
from pyspark.sql import DataFrame as PySparkDataFrame


class PartitionedParquetIOManager(StructuredConfigIOManager):
Expand Down
15 changes: 13 additions & 2 deletions python_modules/dagster/dagster/_config/structured_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ class cached_property: # type: ignore[no-redef]
config_dictionary_from_values,
convert_potential_field,
)
from dagster._core.definitions.resource_definition import ResourceDefinition, ResourceFunction
from dagster._core.definitions.resource_definition import (
ResourceDefinition,
ResourceFunction,
is_context_provided,
)
from dagster._core.storage.io_manager import IOManager, IOManagerDefinition

Self = Any
Expand Down Expand Up @@ -261,7 +265,7 @@ def initialize_and_run(self, context: InitResourceContext) -> T:

resources_to_update, _ = _separate_resource_params(self.__dict__)
resources_to_update = {
k: v.resource_fn(context.replace_config(v.config_schema.default_value))
k: _call_resource_fn(v, context.replace_config(v.config_schema.default_value))
for k, v in resources_to_update.items()
if not isinstance(v, PartialResource)
}
Expand Down Expand Up @@ -603,6 +607,13 @@ def _separate_resource_params(
)


def _call_resource_fn(obj: ResourceDefinition, context: InitResourceContext) -> Any:
if is_context_provided(obj.resource_fn):
return obj.resource_fn(context)
else:
return obj.resource_fn()


_Resource = Resource
_PartialResource = PartialResource
_ResourceDep = ResourceDependency

0 comments on commit c50fdb0

Please sign in to comment.