Skip to content

Commit

Permalink
Add black formatting (#10)
Browse files Browse the repository at this point in the history
From the black website:

> Black is the uncompromising Python code formatter. By using it, you
agree to cede control over minutiae of hand-formatting. In return, Black
gives you speed, determinism, and freedom from pycodestyle nagging about
formatting. You will save time and mental energy for more important
matters.
> 
> Blackened code looks the same regardless of the project you're
reading. Formatting becomes transparent after a while and you can focus
on the content instead.
> 
> Black makes code review faster by producing the smallest diffs
possible.

We've been using it on Connexion for about a year and it's a pleasure to
work with once its usage is automated by pre-commit.
  • Loading branch information
RobbeSneyders authored Mar 9, 2023
1 parent 6293f1c commit 5feeb7f
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 133 deletions.
8 changes: 8 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ repos:
"-sn", # Don't display the score
"--disable=fixme"
]

- repo: https://github.com/PyCQA/bandit
rev: 1.7.4
hooks:
Expand All @@ -28,3 +29,10 @@ repos:
types: [python]
files: "^express/"
args: ["-f", "custom", "-q"]

- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
- id: black
name: black
files: "^express/"
189 changes: 117 additions & 72 deletions express/components/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
from express.storage_interface import StorageHandlerModule

# pylint: disable=no-member
STORAGE_MODULE_PATH = StorageHandlerModule().to_dict()[os.environ.get('CLOUD_ENV', 'GCP')]
STORAGE_MODULE_PATH = StorageHandlerModule().to_dict()[
os.environ.get("CLOUD_ENV", "GCP")
]
STORAGE_HANDLER = importlib.import_module(STORAGE_MODULE_PATH).StorageHandler()

IndexT = TypeVar('IndexT')
DataT = TypeVar('DataT')
IndexT = TypeVar("IndexT")
DataT = TypeVar("DataT")


class ExpressDataset(ABC, Generic[IndexT, DataT]):
Expand All @@ -36,7 +38,7 @@ def __init__(self, manifest: DataManifest):
self.manifest = manifest
self._index_data = self.load_index()

def extend(self) -> 'ExpressDatasetDraft[IndexT, DataT]':
def extend(self) -> "ExpressDatasetDraft[IndexT, DataT]":
"""
Create an `ExpressDatasetDraft` that extends this dataset.
"""
Expand All @@ -62,15 +64,21 @@ def load(self, data_source: str, for_index: Optional[IndexT] = None) -> DataT:
TData: Data of type TData
"""
if data_source not in self.manifest.data_sources:
raise ValueError(f"Named source {data_source} not part of recognised data sources: "
f"{self.manifest.data_sources.keys()}.")
raise ValueError(
f"Named source {data_source} not part of recognised data sources: "
f"{self.manifest.data_sources.keys()}."
)
if for_index is None:
for_index = self._index_data
return self._load_data_source(self.manifest.data_sources[data_source], for_index)
return self._load_data_source(
self.manifest.data_sources[data_source], for_index
)

@staticmethod
@abstractmethod
def _load_data_source(data_source: DataSource, index_filter: Optional[IndexT]) -> DataT:
def _load_data_source(
data_source: DataSource, index_filter: Optional[IndexT]
) -> DataT:
"""
Load data from a (possibly remote) path.
This method can be subclassed to present data in a specific way. For example, as a local
Expand Down Expand Up @@ -102,36 +110,42 @@ class ExpressDatasetDraft(ABC, Generic[IndexT, DataT]):
take over both its index an all data sources. Needs to be present if no `index` is set.
"""

def __init__(self,
index: Optional[Union[DataSource, IndexT]] = None,
data_sources: Dict[str, Union[DataSource, DataT]] = None,
extending_dataset: Optional[ExpressDataset[IndexT, DataT]] = None):
def __init__(
self,
index: Optional[Union[DataSource, IndexT]] = None,
data_sources: Dict[str, Union[DataSource, DataT]] = None,
extending_dataset: Optional[ExpressDataset[IndexT, DataT]] = None,
):
self.index = index
self.data_sources = data_sources or {}
if not (extending_dataset is None) ^ (index is None):
raise ValueError(
"A dataset draft needs to have a single valid index. Either pass an index "
"or a pre-existing dataset to extend. Additional data sources can be added "
"to an extending dataset draft after it's been constructed.")
"to an extending dataset draft after it's been constructed."
)
if extending_dataset is not None:
if index is not None:
raise ValueError(
"A dataset draft needs to have a valid index. Either pass an index or a "
"pre-existing dataset to extend. Not both. Additional data sources can be "
"added to an extending dataset draft after it's been constructed.")
"added to an extending dataset draft after it's been constructed."
)
self.index = extending_dataset.manifest.index
for name, dataset in extending_dataset.manifest.associated_data.items():
self.with_data_source(name, dataset, replace_ok=False)

@classmethod
def extend(cls, dataset: ExpressDataset[IndexT, DataT]) -> 'ExpressDatasetDraft[IndexT, DataT]':
def extend(
cls, dataset: ExpressDataset[IndexT, DataT]
) -> "ExpressDatasetDraft[IndexT, DataT]":
"""
Creates a new Express Dataset draft extending the given dataset, which will take over both
its index and all data sources.
"""
return cls(extending_dataset=dataset)

def with_index(self, index: DataT) -> 'ExpressDatasetDraft[IndexT, DataT]':
def with_index(self, index: DataT) -> "ExpressDatasetDraft[IndexT, DataT]":
"""
Replaces the current index with the given index.
Expand All @@ -141,8 +155,9 @@ def with_index(self, index: DataT) -> 'ExpressDatasetDraft[IndexT, DataT]':
self.index = index
return self

def with_data_source(self, name: str, data: Union[DataT, DataSource], replace_ok=False) \
-> 'ExpressDatasetDraft[IndexT, DataT]':
def with_data_source(
self, name: str, data: Union[DataT, DataSource], replace_ok=False
) -> "ExpressDatasetDraft[IndexT, DataT]":
"""
Adds a new data source or replaces a preexisting data source with the same name.
Expand All @@ -157,9 +172,11 @@ def with_data_source(self, name: str, data: Union[DataT, DataSource], replace_ok
ExpressDatasetDraft[TIndex, TData]: self, for easier chaining
"""
if (name in self.data_sources) and (not replace_ok):
raise ValueError(f"A conflicting data source with identifier {name} is already set "
f"in this draft. Data sources on a dataset draft can be replaced "
f"after it's been constructed.")
raise ValueError(
f"A conflicting data source with identifier {name} is already set "
f"in this draft. Data sources on a dataset draft can be replaced "
f"after it's been constructed."
)
# TODO: verify same namespace?
self.data_sources[name] = data
return self
Expand All @@ -184,15 +201,19 @@ def _path_for_upload(metadata: Metadata, name: str) -> str:
Returns:
str: the destination blob path (indicating a folder) where to upload the file/folder.
"""
artifact_bucket_blob_path = \
artifact_bucket_blob_path = (
f"custom_artifact/{metadata.run_id}/{metadata.component_name}/{name}"
)
destination_path = STORAGE_HANDLER.construct_blob_path(
metadata.artifact_bucket, artifact_bucket_blob_path)
metadata.artifact_bucket, artifact_bucket_blob_path
)
return destination_path

@classmethod
@abstractmethod
def _load_dataset(cls, input_manifest: DataManifest) -> ExpressDataset[IndexT, DataT]:
def _load_dataset(
cls, input_manifest: DataManifest
) -> ExpressDataset[IndexT, DataT]:
"""
Parses a manifest to an ExpressDataset of a specific type, for downstream use by transform
components.
Expand All @@ -214,7 +235,9 @@ def _upload_index(cls, index: IndexT, remote_path: str) -> DataSource:

@classmethod
@abstractmethod
def _upload_data_source(cls, name: str, data: DataT, remote_path: str) -> DataSource:
def _upload_data_source(
cls, name: str, data: DataT, remote_path: str
) -> DataSource:
"""
Uploads data of a certain type as parquet and creates a new DataSource.
Expand Down Expand Up @@ -242,10 +265,11 @@ def _create_metadata(cls, metadata_args: dict) -> Metadata:
return cls._update_metadata(initial_metadata, metadata_args)

@classmethod
def _update_metadata(cls,
metadata: Metadata,
metadata_args: Optional[Dict[str, Union[str, int, float, bool]]]) \
-> Metadata:
def _update_metadata(
cls,
metadata: Metadata,
metadata_args: Optional[Dict[str, Union[str, int, float, bool]]],
) -> Metadata:
"""
Update the manifest metadata
Args:
Expand All @@ -265,8 +289,12 @@ def _update_metadata(cls,
return Metadata.from_dict(metadata_dict)

@classmethod
def _create_output_dataset(cls, draft: ExpressDatasetDraft[IndexT, DataT],
metadata: Metadata, save_path: str) -> DataManifest:
def _create_output_dataset(
cls,
draft: ExpressDatasetDraft[IndexT, DataT],
metadata: Metadata,
save_path: str,
) -> DataManifest:
"""
Processes a dataset draft of a specific type, uploading all local data to storage and
composing the output manifest.
Expand All @@ -285,9 +313,7 @@ def _create_output_dataset(cls, draft: ExpressDatasetDraft[IndexT, DataT],
remote_path = cls._path_for_upload(metadata, name)
data_sources[name] = cls._upload_data_source(name, dataset, remote_path)
manifest = DataManifest(
index=index,
data_sources=data_sources,
metadata=metadata
index=index, data_sources=data_sources, metadata=metadata
)
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
Path(save_path).write_text(manifest.to_json(), encoding="utf-8")
Expand All @@ -311,13 +337,15 @@ def run(cls) -> DataManifest:
"""
args = cls._parse_args()
input_dataset = cls._load_dataset(
input_manifest=DataManifest.from_path(args.input_manifest))
output_dataset_draft = cls.transform(data=input_dataset,
extra_args=json.loads(args.extra_args))
input_manifest=DataManifest.from_path(args.input_manifest)
)
output_dataset_draft = cls.transform(
data=input_dataset, extra_args=json.loads(args.extra_args)
)
output_manifest = cls._create_output_dataset(
draft=output_dataset_draft,
metadata=json.loads(args.metadata),
save_path=args.output_manifest
save_path=args.output_manifest,
)
return output_manifest

Expand All @@ -327,29 +355,39 @@ def _parse_args(cls):
Parse component arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument('--input-manifest',
type=str,
required=True,
help='The input data manifest artifact')
parser.add_argument('--metadata',
type=str,
required=True,
help='The metadata associated with this pipeline run')
parser.add_argument('--extra-args',
type=str,
required=False,
help='Extra arguments for the component, passed as a json dict string')
parser.add_argument('--output-manifest',
type=str,
required=True,
help='The output data manifest artifact')
parser.add_argument(
"--input-manifest",
type=str,
required=True,
help="The input data manifest artifact",
)
parser.add_argument(
"--metadata",
type=str,
required=True,
help="The metadata associated with this pipeline run",
)
parser.add_argument(
"--extra-args",
type=str,
required=False,
help="Extra arguments for the component, passed as a json dict string",
)
parser.add_argument(
"--output-manifest",
type=str,
required=True,
help="The output data manifest artifact",
)
return parser.parse_args()

@classmethod
@abstractmethod
def transform(cls, data: ExpressDataset[IndexT, DataT],
extra_args: Optional[Dict[str, Union[str, int, float, bool]]] = None) \
-> ExpressDatasetDraft[IndexT, DataT]:
def transform(
cls,
data: ExpressDataset[IndexT, DataT],
extra_args: Optional[Dict[str, Union[str, int, float, bool]]] = None,
) -> ExpressDatasetDraft[IndexT, DataT]:
"""
Applies transformations to the input dataset and creates a draft for a new dataset.
The recommended pattern for a transform is to extend the input dataset with a filtered index
Expand Down Expand Up @@ -394,7 +432,7 @@ def run(cls) -> DataManifest:
output_manifest = cls._create_output_dataset(
draft=output_dataset_draft,
save_path=args.output_manifest,
metadata=metadata
metadata=metadata,
)
return output_manifest

Expand All @@ -404,24 +442,31 @@ def _parse_args(cls):
Parse component arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument('--metadata-args',
type=str,
required=False,
help='Metadata arguments, passed as a json dict string')
parser.add_argument('--extra-args',
type=str,
required=False,
help='Extra arguments, passed as a json dict string')
parser.add_argument('--output-manifest',
type=str,
required=True,
help='The output data manifest artifact')
parser.add_argument(
"--metadata-args",
type=str,
required=False,
help="Metadata arguments, passed as a json dict string",
)
parser.add_argument(
"--extra-args",
type=str,
required=False,
help="Extra arguments, passed as a json dict string",
)
parser.add_argument(
"--output-manifest",
type=str,
required=True,
help="The output data manifest artifact",
)
return parser.parse_args()

@classmethod
@abstractmethod
def load(cls, extra_args: Optional[Dict[str, Union[str, int, float, bool]]] = None) -> \
ExpressDatasetDraft[IndexT, DataT]:
def load(
cls, extra_args: Optional[Dict[str, Union[str, int, float, bool]]] = None
) -> ExpressDatasetDraft[IndexT, DataT]:
"""
Loads data from an arbitrary source to create a draft for a new dataset.
Expand Down
Loading

0 comments on commit 5feeb7f

Please sign in to comment.