Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,32 @@
# under the License.
from __future__ import annotations

import json
import os
from datetime import datetime

from airflow.decorators import task
from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
from airflow.providers.google.suite.operators.sheets import GoogleSheetsCreateSpreadsheetOperator
from airflow.providers.google.suite.transfers.gcs_to_sheets import GCSToGoogleSheetsOperator
from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
DAG_ID = "example_gcs_to_sheets"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "example-spreadsheetID")
NEW_SPREADSHEET_ID = os.environ.get("NEW_SPREADSHEET_ID", "1234567890qwerty")
SPREADSHEET = {
"properties": {"title": "Test1"},
"sheets": [{"properties": {"title": "Sheet1"}}],
}
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"


with DAG(
DAG_ID,
Expand All @@ -45,33 +55,68 @@
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

@task
def create_temp_sheets_connection():
conn = Connection(
conn_id=CONNECTION_ID,
conn_type="google_cloud_platform",
)
conn_extra = {
"scope": "https://www.googleapis.com/auth/spreadsheets,https://www.googleapis.com/auth/cloud-platform",
"project": PROJECT_ID,
"keyfile_dict": "", # Override to match your needs
}
conn_extra_json = json.dumps(conn_extra)
conn.set_extra(conn_extra_json)

session: Session = Session()
session.add(conn)
session.commit()

create_temp_sheets_connection_task = create_temp_sheets_connection()

create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
task_id="create_spreadsheet", spreadsheet=SPREADSHEET, gcp_conn_id=CONNECTION_ID
)

upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
task_id="upload_sheet_to_gcs",
destination_bucket=BUCKET_NAME,
spreadsheet_id=SPREADSHEET_ID,
spreadsheet_id="{{ task_instance.xcom_pull(task_ids='create_spreadsheet', "
"key='spreadsheet_id') }}",
gcp_conn_id=CONNECTION_ID,
)

# [START upload_gcs_to_sheets]
upload_gcs_to_sheet = GCSToGoogleSheetsOperator(
task_id="upload_gcs_to_sheet",
bucket_name=BUCKET_NAME,
object_name="{{ task_instance.xcom_pull('upload_sheet_to_gcs')[0] }}",
spreadsheet_id=NEW_SPREADSHEET_ID,
spreadsheet_id="{{ task_instance.xcom_pull(task_ids='create_spreadsheet', "
"key='spreadsheet_id') }}",
gcp_conn_id=CONNECTION_ID,
)
# [END upload_gcs_to_sheets]

delete_temp_sheets_connection_task = BashOperator(
task_id="delete_temp_sheets_connection",
bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

(
# TEST SETUP
create_bucket
[create_bucket, create_temp_sheets_connection_task]
>> create_spreadsheet
>> upload_sheet_to_gcs
# TEST BODY
>> upload_gcs_to_sheet
# TEST TEARDOWN
>> delete_bucket
>> [delete_bucket, delete_temp_sheets_connection_task]
)

from tests.system.utils.watcher import watcher
Expand Down
53 changes: 43 additions & 10 deletions tests/system/providers/google/cloud/gcs/example_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,33 @@
# under the License.
from __future__ import annotations

import json
import os
from datetime import datetime

from airflow.decorators import task
from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
from airflow.providers.google.suite.operators.sheets import GoogleSheetsCreateSpreadsheetOperator
from airflow.providers.google.suite.transfers.gcs_to_sheets import GCSToGoogleSheetsOperator
from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
DAG_ID = "example_sheets_gcs"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "1234567890qwerty")
NEW_SPREADSHEET_ID = os.environ.get("NEW_SPREADSHEET_ID", "1234567890qwerty")

SPREADSHEET = {
"properties": {"title": "Test1"},
"sheets": [{"properties": {"title": "Sheet1"}}],
}
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"


with DAG(
DAG_ID,
Expand All @@ -53,17 +56,39 @@
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

@task
def create_temp_sheets_connection():
conn = Connection(
conn_id=CONNECTION_ID,
conn_type="google_cloud_platform",
)
conn_extra = {
"scope": "https://www.googleapis.com/auth/spreadsheets,https://www.googleapis.com/auth/cloud-platform",
"project": PROJECT_ID,
"keyfile_dict": "", # Override to match your needs
}
conn_extra_json = json.dumps(conn_extra)
conn.set_extra(conn_extra_json)

session: Session = Session()
session.add(conn)
session.commit()

create_temp_sheets_connection_task = create_temp_sheets_connection()

# [START upload_sheet_to_gcs]
upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
task_id="upload_sheet_to_gcs",
destination_bucket=BUCKET_NAME,
spreadsheet_id=SPREADSHEET_ID,
spreadsheet_id="{{ task_instance.xcom_pull(task_ids='create_spreadsheet', "
"key='spreadsheet_id') }}",
gcp_conn_id=CONNECTION_ID,
)
# [END upload_sheet_to_gcs]

# [START create_spreadsheet]
create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
task_id="create_spreadsheet", spreadsheet=SPREADSHEET
task_id="create_spreadsheet", spreadsheet=SPREADSHEET, gcp_conn_id=CONNECTION_ID
)
# [END create_spreadsheet]

Expand All @@ -79,24 +104,32 @@
task_id="upload_gcs_to_sheet",
bucket_name=BUCKET_NAME,
object_name="{{ task_instance.xcom_pull('upload_sheet_to_gcs')[0] }}",
spreadsheet_id=NEW_SPREADSHEET_ID,
spreadsheet_id="{{ task_instance.xcom_pull(task_ids='create_spreadsheet', "
"key='spreadsheet_id') }}",
gcp_conn_id=CONNECTION_ID,
)
# [END upload_gcs_to_sheet]

delete_temp_sheets_connection_task = BashOperator(
task_id="delete_temp_sheets_connection",
bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

(
# TEST SETUP
create_bucket
[create_bucket, create_temp_sheets_connection_task]
# TEST BODY
>> create_spreadsheet
>> print_spreadsheet_url
>> upload_sheet_to_gcs
>> upload_gcs_to_sheet
# TEST TEARDOWN
>> delete_bucket
>> [delete_bucket, delete_temp_sheets_connection_task]
)

from tests.system.utils.watcher import watcher
Expand Down
56 changes: 50 additions & 6 deletions tests/system/providers/google/cloud/gcs/example_sheets_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,31 @@
# under the License.
from __future__ import annotations

import json
import os
from datetime import datetime

from airflow.decorators import task
from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
from airflow.providers.google.suite.operators.sheets import GoogleSheetsCreateSpreadsheetOperator
from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
DAG_ID = "example_sheets_to_gcs"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "1234567890qwerty")
SPREADSHEET = {
"properties": {"title": "Test1"},
"sheets": [{"properties": {"title": "Sheet1"}}],
}
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"


with DAG(
DAG_ID,
Expand All @@ -43,25 +54,58 @@
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

@task
def create_temp_sheets_connection():
conn = Connection(
conn_id=CONNECTION_ID,
conn_type="google_cloud_platform",
)
conn_extra = {
"scope": "https://www.googleapis.com/auth/spreadsheets,https://www.googleapis.com/auth/cloud-platform",
"project": PROJECT_ID,
"keyfile_dict": "", # Override to match your needs
}
conn_extra_json = json.dumps(conn_extra)
conn.set_extra(conn_extra_json)

session: Session = Session()
session.add(conn)
session.commit()

create_temp_sheets_connection_task = create_temp_sheets_connection()

create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
task_id="create_spreadsheet", spreadsheet=SPREADSHEET, gcp_conn_id=CONNECTION_ID
)

# [START upload_sheet_to_gcs]
upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
task_id="upload_sheet_to_gcs",
destination_bucket=BUCKET_NAME,
spreadsheet_id=SPREADSHEET_ID,
spreadsheet_id="{{ task_instance.xcom_pull(task_ids='create_spreadsheet', "
"key='spreadsheet_id') }}",
gcp_conn_id=CONNECTION_ID,
)
# [END upload_sheet_to_gcs]

delete_temp_sheets_connection_task = BashOperator(
task_id="delete_temp_sheets_connection",
bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

(
# TEST SETUP
create_bucket
[create_bucket, create_temp_sheets_connection_task]
>> create_spreadsheet
# TEST BODY
>> upload_sheet_to_gcs
# TEST TEARDOWN
>> delete_bucket
>> [delete_bucket, delete_temp_sheets_connection_task]
)

from tests.system.utils.watcher import watcher
Expand Down
Loading