Skip to content

Commit

Permalink
Dataform operators, links, update system tests and docs (#27144)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrGeorgeOwl authored Nov 11, 2022
1 parent 0a059ee commit 1059de6
Show file tree
Hide file tree
Showing 10 changed files with 2,183 additions and 32 deletions.
378 changes: 377 additions & 1 deletion airflow/providers/google/cloud/hooks/dataform.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry
from google.cloud.dataform_v1beta1 import DataformClient
from google.cloud.dataform_v1beta1.types import CompilationResult, WorkflowInvocation
from google.cloud.dataform_v1beta1.types import (
CompilationResult,
InstallNpmPackagesResponse,
Repository,
WorkflowInvocation,
Workspace,
WriteFileResponse,
)

from airflow import AirflowException
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
Expand Down Expand Up @@ -249,3 +256,372 @@ def cancel_workflow_invocation(
client.cancel_workflow_invocation(
request={"name": name}, retry=retry, timeout=timeout, metadata=metadata
)

@GoogleBaseHook.fallback_to_default_project_id
def create_repository(
self,
*,
project_id: str,
region: str,
repository_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Repository:
"""
Creates repository
:param project_id: Required. The ID of the Google Cloud project where repository should be.
:param region: Required. The ID of the Google Cloud region where repository should be.
:param repository_id: Required. The ID of the new Dataform repository.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
parent = f"projects/{project_id}/locations/{region}"
request = {
"parent": parent,
"repository_id": repository_id,
}

repository = client.create_repository(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

return repository

@GoogleBaseHook.fallback_to_default_project_id
def delete_repository(
self,
*,
project_id: str,
region: str,
repository_id: str,
force: bool = True,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> None:
"""
Deletes repository.
:param project_id: Required. The ID of the Google Cloud project where repository located.
:param region: Required. The ID of the Google Cloud region where repository located.
:param repository_id: Required. The ID of the Dataform repository that should be deleted.
:param force: If set to true, any child resources of this repository will also be deleted.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
name = f"projects/{project_id}/locations/{region}/repositories/{repository_id}"
request = {
"name": name,
"force": force,
}

client.delete_repository(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def create_workspace(
self,
*,
project_id: str,
region: str,
repository_id: str,
workspace_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Workspace:
"""
Creates workspace.
:param project_id: Required. The ID of the Google Cloud project where workspace should be.
:param region: Required. The ID of the Google Cloud region where workspace should be.
:param repository_id: Required. The ID of the Dataform repository where workspace should be.
:param workspace_id: Required. The ID of the new Dataform workspace.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
parent = f"projects/{project_id}/locations/{region}/repositories/{repository_id}"

request = {"parent": parent, "workspace_id": workspace_id}

workspace = client.create_workspace(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

return workspace

@GoogleBaseHook.fallback_to_default_project_id
def delete_workspace(
self,
*,
project_id: str,
region: str,
repository_id: str,
workspace_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
"""
Deletes workspace.
:param project_id: Required. The ID of the Google Cloud project where workspace located.
:param region: Required. The ID of the Google Cloud region where workspace located.
:param repository_id: Required. The ID of the Dataform repository where workspace located.
:param workspace_id: Required. The ID of the Dataform workspace that should be deleted.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
workspace_path = (
f"projects/{project_id}/locations/{region}/"
f"repositories/{repository_id}/workspaces/{workspace_id}"
)
request = {
"name": workspace_path,
}

client.delete_workspace(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def write_file(
self,
*,
project_id: str,
region: str,
repository_id: str,
workspace_id: str,
filepath: str,
contents: bytes,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> WriteFileResponse:
"""
Writes a new file to the specified workspace.
:param project_id: Required. The ID of the Google Cloud project where workspace located.
:param region: Required. The ID of the Google Cloud region where workspace located.
:param repository_id: Required. The ID of the Dataform repository where workspace located.
:param workspace_id: Required. The ID of the Dataform workspace where files should be created.
:param filepath: Required. Path to file including name of the file relative to workspace root.
:param contents: Required. Content of the file to be written.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
workspace_path = (
f"projects/{project_id}/locations/{region}/"
f"repositories/{repository_id}/workspaces/{workspace_id}"
)
request = {
"workspace": workspace_path,
"path": filepath,
"contents": contents,
}

response = client.write_file(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

return response

@GoogleBaseHook.fallback_to_default_project_id
def make_directory(
self,
*,
project_id: str,
region: str,
repository_id: str,
workspace_id: str,
path: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> dict:
"""
Makes new directory in specified workspace.
:param project_id: Required. The ID of the Google Cloud project where workspace located.
:param region: Required. The ID of the Google Cloud region where workspace located.
:param repository_id: Required. The ID of the Dataform repository where workspace located.
:param workspace_id: Required. The ID of the Dataform workspace where directory should be created.
:param path: Required. The directory's full path including new directory name,
relative to the workspace root.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
workspace_path = (
f"projects/{project_id}/locations/{region}/"
f"repositories/{repository_id}/workspaces/{workspace_id}"
)
request = {
"workspace": workspace_path,
"path": path,
}

response = client.make_directory(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

return response

@GoogleBaseHook.fallback_to_default_project_id
def remove_directory(
self,
*,
project_id: str,
region: str,
repository_id: str,
workspace_id: str,
path: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
"""
Removes directory in specified workspace.
:param project_id: Required. The ID of the Google Cloud project where workspace located.
:param region: Required. The ID of the Google Cloud region where workspace located.
:param repository_id: Required. The ID of the Dataform repository where workspace located.
:param workspace_id: Required. The ID of the Dataform workspace where directory located.
:param path: Required. The directory's full path including directory name,
relative to the workspace root.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
workspace_path = (
f"projects/{project_id}/locations/{region}/"
f"repositories/{repository_id}/workspaces/{workspace_id}"
)
request = {
"workspace": workspace_path,
"path": path,
}

client.remove_directory(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def remove_file(
self,
*,
project_id: str,
region: str,
repository_id: str,
workspace_id: str,
filepath: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
"""
Removes file in specified workspace.
:param project_id: Required. The ID of the Google Cloud project where workspace located.
:param region: Required. The ID of the Google Cloud region where workspace located.
:param repository_id: Required. The ID of the Dataform repository where workspace located.
:param workspace_id: Required. The ID of the Dataform workspace where directory located.
:param filepath: Required. The full path including name of the file, relative to the workspace root.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
workspace_path = (
f"projects/{project_id}/locations/{region}/"
f"repositories/{repository_id}/workspaces/{workspace_id}"
)
request = {
"workspace": workspace_path,
"path": filepath,
}

client.remove_file(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def install_npm_packages(
self,
*,
project_id: str,
region: str,
repository_id: str,
workspace_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> InstallNpmPackagesResponse:
"""
Installs npm dependencies in the provided workspace. Requires "package.json"
to be created in workspace
:param project_id: Required. The ID of the Google Cloud project where workspace located.
:param region: Required. The ID of the Google Cloud region where workspace located.
:param repository_id: Required. The ID of the Dataform repository where workspace located.
:param workspace_id: Required. The ID of the Dataform workspace.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_dataform_client()
workspace_path = (
f"projects/{project_id}/locations/{region}/"
f"repositories/{repository_id}/workspaces/{workspace_id}"
)
request = {
"workspace": workspace_path,
}

response = client.install_npm_packages(
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

return response
Loading

0 comments on commit 1059de6

Please sign in to comment.