Skip to content

[CDF-24214] 👮Standardize Transformation Authentication #1565

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,13 @@ def load_resource_file(
# is used to compare with the CDF resource.
for resource in resources:
identifier = self.get_id(resource)
credentials = read_auth(identifier, resource, self.client, "function schedule", self.console)
credentials = read_auth(
resource.get("authentication"),
self.client.config,
identifier,
"function schedule",
console=self.console,
)
self.authentication_by_id[identifier] = credentials
auth_hash = calculate_secure_hash(credentials.dump(camel_case=True), shorten=True)
extra_str = f" {self._hash_key}: {auth_hash}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import warnings
from collections import defaultdict
from collections.abc import Hashable, Iterable, Sequence
from copy import deepcopy
from functools import lru_cache
from pathlib import Path
from typing import Any, cast, final
from typing import Any, Literal, cast, final

from cognite.client.data_classes import (
ClientCredentials,
OidcCredentials,
Transformation,
TransformationList,
Expand All @@ -54,26 +56,29 @@
DataModelId,
ViewId,
)
from cognite.client.data_classes.transformations import NonceCredentials
from cognite.client.data_classes.transformations.notifications import (
TransformationNotificationWrite,
TransformationNotificationWriteList,
)
from cognite.client.exceptions import CogniteAPIError, CogniteAuthError, CogniteDuplicatedError, CogniteNotFoundError
from cognite.client.utils.useful_types import SequenceNotStr
from rich import print
from rich.console import Console

from cognite_toolkit._cdf_tk._parameters import ANY_INT, ParameterSpec, ParameterSpecSet
from cognite_toolkit._cdf_tk.client import ToolkitClient
from cognite_toolkit._cdf_tk.client.data_classes.raw import RawDatabase, RawTable
from cognite_toolkit._cdf_tk.constants import BUILD_FOLDER_ENCODING
from cognite_toolkit._cdf_tk.exceptions import (
ResourceCreationError,
ToolkitFileNotFoundError,
ToolkitInvalidParameterNameError,
ToolkitRequiredValueError,
ToolkitTypeError,
ToolkitYAMLFormatError,
)
from cognite_toolkit._cdf_tk.loaders._base_loaders import ResourceLoader
from cognite_toolkit._cdf_tk.tk_warnings import HighSeverityWarning
from cognite_toolkit._cdf_tk.utils import (
calculate_secure_hash,
humanize_collection,
Expand All @@ -82,7 +87,7 @@
quote_int_value_by_key_in_yaml,
safe_read,
)
from cognite_toolkit._cdf_tk.utils.cdf import try_find_error
from cognite_toolkit._cdf_tk.utils.cdf import read_auth, try_find_error
from cognite_toolkit._cdf_tk.utils.diff_list import diff_list_hashable

from .auth_loaders import GroupAllScopedLoader
Expand Down Expand Up @@ -122,6 +127,13 @@ class TransformationLoader(
_doc_url = "Transformations/operation/createTransformations"
_hash_key = "-- cdf-auth"

def __init__(self, client: ToolkitClient, build_dir: Path | None, console: Console | None = None):
super().__init__(client, build_dir, console)
self._authentication_by_id_operation: dict[
tuple[str, Literal["read", "write"]], OidcCredentials | ClientCredentials
] = {}
self._nonce_cache: dict[str, NonceCredentials] = {}

@property
def display_name(self) -> str:
return "transformations"
Expand Down Expand Up @@ -238,8 +250,6 @@ def load_resource_file(
"authentication",
"sourceOidcCredentials",
"destinationOidcCredentials",
"sourceNonce",
"destinationNonce",
]:
if key in item:
auth_dict[key] = item[key]
Expand All @@ -249,6 +259,50 @@ def load_resource_file(
hash_str = f"{self._hash_key}: {auth_hash}"
if not item["query"].startswith(self._hash_key):
item["query"] = f"{hash_str}\n{item['query']}"

if "sourceOidcCredentials" in item:
HighSeverityWarning(
"The property 'sourceOidcCredentials' is deprecated. Use 'authentication.read' instead."
).print_warning(console=self.console)
item.pop("sourceOidcCredentials")

if "destinationOidcCredentials" in item:
HighSeverityWarning(
"The property 'destinationOidcCredentials' is deprecated. Use 'authentication.write' instead."
).print_warning(console=self.console)
item.pop("destinationOidcCredentials")

if "sourceNonce" in item:
HighSeverityWarning(
"The property 'sourceNonce' is not used by Toolkit. Use 'authentication.read' instead,"
"then Toolkit will dynamically set the nonce."
).print_warning(console=self.console)
item.pop("sourceNonce")

if "destinationNonce" in item:
HighSeverityWarning(
"The property 'destinationNonce' is not used by Toolkit. Use 'authentication.write' instead,"
"then Toolkit will dynamically set the nonce."
).print_warning(console=self.console)
item.pop("destinationNonce")
auth = item.pop("authentication", None)
if isinstance(auth, dict):
self._authentication_by_id_operation[(external_id, "read")] = read_auth(
auth["read"] if "read" in auth else auth,
self.client.config,
external_id,
"transformation",
allow_oidc=True,
console=self.console,
)
self._authentication_by_id_operation[(external_id, "write")] = read_auth(
auth["write"] if "write" in auth else auth,
self.client.config,
external_id,
"transformation",
allow_oidc=True,
console=self.console,
)
return raw_list

def load_resource(self, resource: dict[str, Any], is_dry_run: bool = False) -> TransformationWrite:
Expand All @@ -270,29 +324,7 @@ def load_resource(self, resource: dict[str, Any], is_dry_run: bool = False) -> T
if "conflictMode" not in resource:
# Todo; Bug SDK missing default value
resource["conflictMode"] = "upsert"

source_oidc_credentials = (
resource.get("authentication", {}).get("read") or resource.get("authentication") or None
)
destination_oidc_credentials = (
resource.get("authentication", {}).get("write") or resource.get("authentication") or None
)
transformation = TransformationWrite._load(resource)
try:
if transformation.source_oidc_credentials is None:
transformation.source_oidc_credentials = source_oidc_credentials and OidcCredentials.load(
source_oidc_credentials
)
if transformation.destination_oidc_credentials is None:
transformation.destination_oidc_credentials = destination_oidc_credentials and OidcCredentials.load(
destination_oidc_credentials
)
except KeyError as e:
item_id = self.get_id(resource)
raise ToolkitTypeError(
f"Ill-formed Transformation {item_id}: Authentication property is missing required fields"
) from e
return transformation
return TransformationWrite._load(resource)

def dump_resource(self, resource: Transformation, local: dict[str, Any] | None = None) -> dict[str, Any]:
dumped = resource.as_write().dump()
Expand Down Expand Up @@ -327,12 +359,13 @@ def create(self, items: Sequence[TransformationWrite]) -> TransformationList:
warnings.simplefilter("ignore")
# Ignoring warnings from SDK about session unauthorized. Motivation is CDF is not fast enough to
# handle first a group that authorizes the session and then the transformation.
try:
return self.client.transformations.create(items)
except CogniteAuthError as e:
if error := self._create_auth_creation_error(items):
raise error from e
raise e
self._update_nonce(items)
try:
return self.client.transformations.create(items)
except CogniteAuthError as e:
if error := self._create_auth_creation_error(items):
raise error from e
raise e

def retrieve(self, ids: SequenceNotStr[str | int]) -> TransformationList:
internal_ids, external_ids = self._split_ids(ids)
Expand All @@ -345,6 +378,7 @@ def update(self, items: Sequence[TransformationWrite]) -> TransformationList:
warnings.simplefilter("ignore")
# Ignoring warnings from SDK about session unauthorized. Motivation is CDF is not fast enough to
# handle first a group that authorizes the session and then the transformation.
self._update_nonce(items)
try:
return self.client.transformations.update(items, mode="replace")
except CogniteAuthError as e:
Expand Down Expand Up @@ -377,6 +411,31 @@ def delete(self, ids: SequenceNotStr[str | int]) -> int:
self.client.transformations.delete(id=existing, ignore_unknown_ids=True)
return len(existing)

def _update_nonce(self, items: Sequence[TransformationWrite]) -> None:
for item in items:
if item.external_id:
if read_credentials := self._authentication_by_id_operation.get((item.external_id, "read")):
item.source_nonce = self._create_nonce(read_credentials)
if write_credentials := self._authentication_by_id_operation.get((item.external_id, "write")):
item.destination_nonce = self._create_nonce(write_credentials)

def _create_nonce(self, credentials: OidcCredentials | ClientCredentials) -> NonceCredentials:
key = calculate_secure_hash(credentials.dump(), shorten=True)
if key in self._nonce_cache:
return self._nonce_cache[key]
if isinstance(credentials, ClientCredentials):
session = self.client.iam.sessions.create(credentials)
return NonceCredentials(session.id, session.nonce, self.client.config.project)
elif isinstance(credentials, OidcCredentials):
config = deepcopy(self.client.config)
config.project = credentials.cdf_project_name
config.credentials = credentials.as_credential_provider()
other_client = ToolkitClient(config)
session = other_client.iam.sessions.create(credentials.as_client_credentials())
return NonceCredentials(session.id, session.nonce, credentials.cdf_project_name)
else:
raise ValueError(f"Error in TransformationLoader: {type(credentials)} is not a valid credentials type")

def _iterate(
self,
data_set_external_id: str | None = None,
Expand Down Expand Up @@ -430,6 +489,12 @@ def sensitive_strings(self, item: TransformationWrite) -> Iterable[str]:
if item.destination_oidc_credentials:
yield item.destination_oidc_credentials.client_secret

if item.external_id:
if read_credentials := self._authentication_by_id_operation.get((item.external_id, "read")):
yield read_credentials.client_secret
if write_credentials := self._authentication_by_id_operation.get((item.external_id, "write")):
yield write_credentials.client_secret


@final
class TransformationScheduleLoader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,9 @@ def load_resource_file(
# is used to compare with the CDF resource.
for resource in resources:
identifier = self.get_id(resource)
credentials = read_auth(identifier, resource, self.client, "workflow trigger", self.console)
credentials = read_auth(
resource.get("authentication"), self.client.config, identifier, "workflow trigger", console=self.console
)
self._authentication_by_id[identifier] = credentials
if "metadata" not in resource:
resource["metadata"] = {}
Expand Down
47 changes: 35 additions & 12 deletions cognite_toolkit/_cdf_tk/utils/cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from cognite.client.exceptions import CogniteAPIError
from rich.console import Console

from cognite_toolkit._cdf_tk.client import ToolkitClient
from cognite_toolkit._cdf_tk.client import ToolkitClient, ToolkitClientConfig
from cognite_toolkit._cdf_tk.constants import ENV_VAR_PATTERN
from cognite_toolkit._cdf_tk.exceptions import (
ToolkitRequiredValueError,
Expand Down Expand Up @@ -115,28 +115,51 @@ def iterate_instances(
body["cursor"] = next_cursor


@overload
def read_auth(
authentication: object,
client_config: ToolkitClientConfig,
identifier: Hashable,
resource_name: str,
allow_oidc: Literal[False] = False,
console: Console | None = None,
) -> ClientCredentials: ...


@overload
def read_auth(
authentication: object,
client_config: ToolkitClientConfig,
identifier: Hashable,
resource_name: str,
allow_oidc: Literal[True],
console: Console | None = None,
) -> OidcCredentials: ...


def read_auth(
authentication: object,
client_config: ToolkitClientConfig,
identifier: Hashable,
resource: dict[str, Any],
client: ToolkitClient,
resource_name: str,
allow_oidc: bool = False,
console: Console | None = None,
) -> ClientCredentials:
auth = resource.get("authentication")
if auth is None:
if client.config.is_strict_validation or not isinstance(client.config.credentials, OAuthClientCredentials):
) -> ClientCredentials | OidcCredentials:
if authentication is None:
if client_config.is_strict_validation or not isinstance(client_config.credentials, OAuthClientCredentials):
raise ToolkitRequiredValueError(f"Authentication is missing for {resource_name} {identifier!r}.")
else:
HighSeverityWarning(
f"Authentication is missing for {resource_name} {identifier!r}. Falling back to the Toolkit credentials"
).print_warning(console=console)
credentials = ClientCredentials(client.config.credentials.client_id, client.config.credentials.client_secret)
elif not isinstance(auth, dict):
return ClientCredentials(client_config.credentials.client_id, client_config.credentials.client_secret)
elif not isinstance(authentication, dict):
raise ToolkitTypeError(f"Authentication must be a dictionary for {resource_name} {identifier!r}")
elif "clientId" not in auth or "clientSecret" not in auth:
elif "clientId" not in authentication or "clientSecret" not in authentication:
raise ToolkitRequiredValueError(
f"Authentication must contain clientId and clientSecret for {resource_name} {identifier!r}"
)
elif allow_oidc and "tokenUri" in authentication and "cdfProjectName" in authentication:
return OidcCredentials.load(authentication)
else:
credentials = ClientCredentials(auth["clientId"], auth["clientSecret"])
return credentials
return ClientCredentials(authentication["clientId"], authentication["clientSecret"])
32 changes: 32 additions & 0 deletions tests/test_integration/test_loaders/test_resource_loaders.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from asyncio import sleep
from contextlib import suppress
from pathlib import Path
Expand Down Expand Up @@ -58,6 +59,7 @@
ResourceWorker,
RobotCapabilityLoader,
RoboticsDataPostProcessingLoader,
TransformationLoader,
WorkflowVersionLoader,
)
from cognite_toolkit._cdf_tk.tk_warnings import EnvironmentVariableMissingWarning, catch_warnings
Expand Down Expand Up @@ -719,3 +721,33 @@ def test_load_workflow_without_defaults_not_redeployed(self, toolkit_client: Too
"delete": len(to_delete),
"unchanged": len(unchanged),
} == {"create": 0, "change": 0, "delete": 0, "unchanged": 1}


class TestTransformationLoader:
def test_create_transformation_auth_without_scope(self, toolkit_client: ToolkitClient) -> None:
transformation_text = """externalId: transformation_without_scope
name: This is a test transformation
destination:
type: assets
ignoreNullFields: true
isPublic: true
conflictMode: upsert
query: Select * from assets
# Reusing the credentials from the Toolkit principal
authentication:
clientId: ${IDP_CLIENT_ID}
clientSecret: ${IDP_CLIENT_SECRET}
"""
loader = TransformationLoader.create_loader(toolkit_client)
filepath = MagicMock(spec=Path)
filepath.read_text.return_value = transformation_text

loaded = loader.load_resource_file(filepath, dict(os.environ))
assert len(loaded) == 1
transformation = loader.load_resource(loaded[0])

try:
created = loader.create([transformation])
assert len(created) == 1
finally:
toolkit_client.transformations.delete(external_id="transformation_without_scope", ignore_unknown_ids=True)
Loading