-
Notifications
You must be signed in to change notification settings - Fork 906
/
context.py
339 lines (279 loc) · 11.8 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""This module provides context for Kedro project."""
from __future__ import annotations
import logging
from copy import deepcopy
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import Any
from urllib.parse import urlparse
from warnings import warn
from pluggy import PluginManager
from kedro.config import ConfigLoader, MissingConfigException
from kedro.framework.project import settings
from kedro.io import DataCatalog
from kedro.pipeline.pipeline import _transcode_split
def _is_relative_path(path_string: str) -> bool:
"""Checks whether a path string is a relative path.
Example:
::
>>> _is_relative_path("data/01_raw") == True
>>> _is_relative_path("info.log") == True
>>> _is_relative_path("/tmp/data/01_raw") == False
>>> _is_relative_path(r"C:\\info.log") == False
>>> _is_relative_path(r"\\'info.log") == False
>>> _is_relative_path("c:/info.log") == False
>>> _is_relative_path("s3://info.log") == False
Args:
path_string: The path string to check.
Returns:
Whether the string is a relative path.
"""
# os.path.splitdrive does not reliably work on non-Windows systems
# breaking the coverage, using PureWindowsPath instead
is_full_windows_path_with_drive = bool(PureWindowsPath(path_string).drive)
if is_full_windows_path_with_drive:
return False
is_remote_path = bool(urlparse(path_string).scheme)
if is_remote_path:
return False
is_absolute_path = PurePosixPath(path_string).is_absolute()
if is_absolute_path:
return False
return True
def _convert_paths_to_absolute_posix(
project_path: Path, conf_dictionary: dict[str, Any]
) -> dict[str, Any]:
"""Turn all relative paths inside ``conf_dictionary`` into absolute paths by appending them
to ``project_path`` and convert absolute Windows paths to POSIX format. This is a hack to
make sure that we don't have to change user's working directory for logging and datasets to
work. It is important for non-standard workflows such as IPython notebook where users don't go
through `kedro run` or `__main__.py` entrypoints.
Example:
::
>>> conf = _convert_paths_to_absolute_posix(
>>> project_path=Path("/path/to/my/project"),
>>> conf_dictionary={
>>> "handlers": {
>>> "info_file_handler": {
>>> "filename": "info.log"
>>> }
>>> }
>>> }
>>> )
>>> print(conf['handlers']['info_file_handler']['filename'])
"/path/to/my/project/info.log"
Args:
project_path: The root directory to prepend to relative path to make absolute path.
conf_dictionary: The configuration containing paths to expand.
Returns:
A dictionary containing only absolute paths.
Raises:
ValueError: If the provided ``project_path`` is not an absolute path.
"""
if not project_path.is_absolute():
raise ValueError(
f"project_path must be an absolute path. Received: {project_path}"
)
# only check a few conf keys that are known to specify a path string as value
conf_keys_with_filepath = ("filename", "filepath", "path")
for conf_key, conf_value in conf_dictionary.items():
# if the conf_value is another dictionary, absolutify its paths first.
if isinstance(conf_value, dict):
conf_dictionary[conf_key] = _convert_paths_to_absolute_posix(
project_path, conf_value
)
continue
# if the conf_value is not a dictionary nor a string, skip
if not isinstance(conf_value, str):
continue
# if the conf_value is a string but the conf_key isn't one associated with filepath, skip
if conf_key not in conf_keys_with_filepath:
continue
if _is_relative_path(conf_value):
# Absolute local path should be in POSIX format
conf_value_absolute_path = (project_path / conf_value).as_posix()
conf_dictionary[conf_key] = conf_value_absolute_path
elif PureWindowsPath(conf_value).drive:
# Convert absolute Windows path to POSIX format
conf_dictionary[conf_key] = PureWindowsPath(conf_value).as_posix()
return conf_dictionary
def _validate_transcoded_datasets(catalog: DataCatalog):
"""Validates transcoded datasets are correctly named
Args:
catalog (DataCatalog): The catalog object containing the
datasets to be validated.
Raises:
ValueError: If a dataset name does not conform to the expected
transcoding naming conventions,a ValueError is raised by the
`_transcode_split` function.
"""
# noqa: protected-access
for dataset_name in catalog._data_sets.keys():
_transcode_split(dataset_name)
def _update_nested_dict(old_dict: dict[Any, Any], new_dict: dict[Any, Any]) -> None:
"""Update a nested dict with values of new_dict.
Args:
old_dict: dict to be updated
new_dict: dict to use for updating old_dict
"""
for key, value in new_dict.items():
if key not in old_dict:
old_dict[key] = value
elif isinstance(old_dict[key], dict) and isinstance(value, dict):
_update_nested_dict(old_dict[key], value)
else:
old_dict[key] = value
class KedroContext:
"""``KedroContext`` is the base class which holds the configuration and
Kedro's main functionality.
"""
def __init__( # noqa: PLR0913
self,
package_name: str,
project_path: Path | str,
config_loader: ConfigLoader,
hook_manager: PluginManager,
env: str = None,
extra_params: dict[str, Any] = None,
):
"""Create a context object by providing the root of a Kedro project and
the environment configuration subfolders
(see ``kedro.config.ConfigLoader``)
Raises:
KedroContextError: If there is a mismatch
between Kedro project version and package version.
Args:
package_name: Package name for the Kedro project the context is
created for.
project_path: Project path to define the context for.
hook_manager: The ``PluginManager`` to activate hooks, supplied by the session.
env: Optional argument for configuration default environment to be used
for running the pipeline. If not specified, it defaults to "local".
extra_params: Optional dictionary containing extra project parameters.
If specified, will update (and therefore take precedence over)
the parameters retrieved from the project configuration.
"""
self._project_path = Path(project_path).expanduser().resolve()
self._package_name = package_name
self._config_loader = config_loader
self._env = env
self._extra_params = deepcopy(extra_params)
self._hook_manager = hook_manager
@property # type: ignore
def env(self) -> str | None:
"""Property for the current Kedro environment.
Returns:
Name of the current Kedro environment.
"""
return self._env
@property
def project_path(self) -> Path:
"""Read-only property containing Kedro's root project directory.
Returns:
Project directory.
"""
return self._project_path
@property
def catalog(self) -> DataCatalog:
"""Read-only property referring to Kedro's ``DataCatalog`` for this context.
Returns:
DataCatalog defined in `catalog.yml`.
Raises:
KedroContextError: Incorrect ``DataCatalog`` registered for the project.
"""
return self._get_catalog()
@property
def params(self) -> dict[str, Any]:
"""Read-only property referring to Kedro's parameters for this context.
Returns:
Parameters defined in `parameters.yml` with the addition of any
extra parameters passed at initialization.
"""
try:
params = self.config_loader["parameters"]
except MissingConfigException as exc:
warn(f"Parameters not found in your Kedro project config.\n{str(exc)}")
params = {}
_update_nested_dict(params, self._extra_params or {})
return params
@property
def config_loader(self):
"""Read-only property referring to Kedro's ``ConfigLoader`` for this
context.
Returns:
Instance of `ConfigLoader`.
Raises:
KedroContextError: Incorrect ``ConfigLoader`` registered for the project.
"""
return self._config_loader
def _get_catalog(
self,
save_version: str = None,
load_versions: dict[str, str] = None,
) -> DataCatalog:
"""A hook for changing the creation of a DataCatalog instance.
Returns:
DataCatalog defined in `catalog.yml`.
Raises:
KedroContextError: Incorrect ``DataCatalog`` registered for the project.
"""
# '**/catalog*' reads modular pipeline configs
conf_catalog = self.config_loader["catalog"]
# turn relative paths in conf_catalog into absolute paths
# before initializing the catalog
conf_catalog = _convert_paths_to_absolute_posix(
project_path=self.project_path, conf_dictionary=conf_catalog
)
conf_creds = self._get_config_credentials()
catalog = settings.DATA_CATALOG_CLASS.from_config(
catalog=conf_catalog,
credentials=conf_creds,
load_versions=load_versions,
save_version=save_version,
)
feed_dict = self._get_feed_dict()
catalog.add_feed_dict(feed_dict)
_validate_transcoded_datasets(catalog)
self._hook_manager.hook.after_catalog_created(
catalog=catalog,
conf_catalog=conf_catalog,
conf_creds=conf_creds,
feed_dict=feed_dict,
save_version=save_version,
load_versions=load_versions,
)
return catalog
def _get_feed_dict(self) -> dict[str, Any]:
"""Get parameters and return the feed dictionary."""
params = self.params
feed_dict = {"parameters": params}
def _add_param_to_feed_dict(param_name, param_value):
"""This recursively adds parameter paths to the `feed_dict`,
whenever `param_value` is a dictionary itself, so that users can
specify specific nested parameters in their node inputs.
Example:
>>> param_name = "a"
>>> param_value = {"b": 1}
>>> _add_param_to_feed_dict(param_name, param_value)
>>> assert feed_dict["params:a"] == {"b": 1}
>>> assert feed_dict["params:a.b"] == 1
"""
key = f"params:{param_name}"
feed_dict[key] = param_value
if isinstance(param_value, dict):
for key, val in param_value.items():
_add_param_to_feed_dict(f"{param_name}.{key}", val)
for param_name, param_value in params.items():
_add_param_to_feed_dict(param_name, param_value)
return feed_dict
def _get_config_credentials(self) -> dict[str, Any]:
"""Getter for credentials specified in credentials directory."""
try:
conf_creds = self.config_loader["credentials"]
except MissingConfigException as exc:
logging.getLogger(__name__).debug(
"Credentials not found in your Kedro project config.\n %s", str(exc)
)
conf_creds = {}
return conf_creds
class KedroContextError(Exception):
"""Error occurred when loading project and running context pipeline."""