Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/providers/google/suite/transfers/gcs_to_gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(
source_bucket: str,
source_object: str,
destination_object: str | None = None,
destination_folder_id: str | None = None,
destination_folder_id: str = "root",
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
Expand Down
14 changes: 7 additions & 7 deletions tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_should_copy_single_file(self, mock_named_temporary_file, mock_gdrive, m
mock.call().upload_file(
local_location="TMP1",
remote_location="copied_sales/2017/january-backup.avro",
folder_id=None,
folder_id="root",
),
]
)
Expand Down Expand Up @@ -156,13 +156,13 @@ def test_should_copy_files(self, mock_named_temporary_file, mock_gdrive, mock_gc
impersonation_chain=IMPERSONATION_CHAIN,
),
mock.call().upload_file(
local_location="TMP1", remote_location="sales/A.avro", folder_id=None
local_location="TMP1", remote_location="sales/A.avro", folder_id="root"
),
mock.call().upload_file(
local_location="TMP2", remote_location="sales/B.avro", folder_id=None
local_location="TMP2", remote_location="sales/B.avro", folder_id="root"
),
mock.call().upload_file(
local_location="TMP3", remote_location="sales/C.avro", folder_id=None
local_location="TMP3", remote_location="sales/C.avro", folder_id="root"
),
]
)
Expand Down Expand Up @@ -210,13 +210,13 @@ def test_should_move_files(self, mock_named_temporary_file, mock_gdrive, mock_gc
impersonation_chain=IMPERSONATION_CHAIN,
),
mock.call().upload_file(
local_location="TMP1", remote_location="sales/A.avro", folder_id=None
local_location="TMP1", remote_location="sales/A.avro", folder_id="root"
),
mock.call().upload_file(
local_location="TMP2", remote_location="sales/B.avro", folder_id=None
local_location="TMP2", remote_location="sales/B.avro", folder_id="root"
),
mock.call().upload_file(
local_location="TMP3", remote_location="sales/C.avro", folder_id=None
local_location="TMP3", remote_location="sales/C.avro", folder_id="root"
),
]
)
Expand Down
93 changes: 78 additions & 15 deletions tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,109 +23,172 @@
"""
from __future__ import annotations

import json
import logging
import os
from datetime import datetime
from pathlib import Path

from airflow.decorators import task
from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator
from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", "abcd1234")
FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", None)

DAG_ID = "example_gcs_to_gdrive"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"

TMP_PATH = "tmp"

WORK_DIR = f"folder_{DAG_ID}_{ENV_ID}".replace("-", "_")
CURRENT_FOLDER = Path(__file__).parent
LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources")

FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
FILE_NAME = "example_upload.txt"

log = logging.getLogger(__name__)


with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "gcs"],
tags=["example", "gcs", "gdrive"],
) as dag:

@task
def create_temp_gcp_connection():
conn = Connection(
conn_id=CONNECTION_ID,
conn_type="google_cloud_platform",
)
conn_extra_json = json.dumps(
{
"scope": "https://www.googleapis.com/auth/drive,"
"https://www.googleapis.com/auth/cloud-platform"
}
)
conn.set_extra(conn_extra_json)

session: Session = Session()
if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first():
log.warning("Connection %s already exists", CONNECTION_ID)
return None
session.add(conn)
session.commit()

create_temp_gcp_connection_task = create_temp_gcp_connection()

create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
upload_file_1 = LocalFilesystemToGCSOperator(
task_id="upload_file_1",
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
dst=f"{TMP_PATH}/{FILE_NAME}",
bucket=BUCKET_NAME,
)

upload_file_2 = LocalFilesystemToGCSOperator(
task_id="upload_fil_2",
task_id="upload_file_2",
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
dst=f"{TMP_PATH}/2_{FILE_NAME}",
bucket=BUCKET_NAME,
)
# [START howto_operator_gcs_to_gdrive_copy_single_file]
copy_single_file = GCSToGoogleDriveOperator(
task_id="copy_single_file",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/{FILE_NAME}",
destination_object=f"copied_tmp/copied_{FILE_NAME}",
destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
)
# [END howto_operator_gcs_to_gdrive_copy_single_file]

# [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
copy_single_file_into_folder = GCSToGoogleDriveOperator(
task_id="copy_single_file_into_folder",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/{FILE_NAME}",
destination_object=f"copied_tmp/copied_{FILE_NAME}",
destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
destination_folder_id=FOLDER_ID,
)
# [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder]

# [START howto_operator_gcs_to_gdrive_copy_files]
copy_files = GCSToGoogleDriveOperator(
task_id="copy_files",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/*",
destination_object="copied_tmp/",
destination_object=f"{WORK_DIR}/",
)
# [END howto_operator_gcs_to_gdrive_copy_files]

# [START howto_operator_gcs_to_gdrive_move_files]
move_files = GCSToGoogleDriveOperator(
task_id="move_files",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/*.txt",
destination_object=f"{WORK_DIR}/",
move_object=True,
)
# [END howto_operator_gcs_to_gdrive_move_files]

@task(trigger_rule=TriggerRule.ALL_DONE)
def remove_files_from_drive():
service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
root_path = (
service.files()
.list(q=f"name = '{WORK_DIR}' and mimeType = 'application/vnd.google-apps.folder'")
.execute()
)
if files := root_path["files"]:
batch = service.new_batch_http_request()
for file in files:
log.info("Preparing to remove file: {}", file)
batch.add(service.files().delete(fileId=file["id"]))
batch.execute()
log.info("Selected files removed.")

remove_files_from_drive_task = remove_files_from_drive()

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

delete_temp_gcp_connection_task = BashOperator(
task_id="delete_temp_gcp_connection",
bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)

# TEST SETUP
create_bucket >> [upload_file_1, upload_file_2]
(
# TEST SETUP
create_bucket
>> upload_file
>> upload_file_2
[upload_file_1, upload_file_2, create_temp_gcp_connection_task]
# TEST BODY
>> copy_single_file
>> copy_single_file_into_folder
>> copy_files
>> move_files
# TEST TEARDOWN
>> delete_bucket
>> remove_files_from_drive_task
>> [delete_bucket, delete_temp_gcp_connection_task]
)

from tests.system.utils.watcher import watcher
Expand Down
74 changes: 65 additions & 9 deletions tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@
# under the License.
from __future__ import annotations

import json
import logging
import os
from datetime import datetime
from pathlib import Path

from airflow.decorators import task
from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gdrive_to_gcs import GoogleDriveToGCSOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator
from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
Expand All @@ -35,20 +42,48 @@
DAG_ID = "example_gdrive_to_gcs_with_gdrive_sensor"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"

OBJECT = "abc123xyz"
FOLDER_ID = ""
FILE_NAME = "example_upload.txt"
DRIVE_FILE_NAME = f"example_upload_{DAG_ID}_{ENV_ID}.txt"
LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)

log = logging.getLogger(__name__)


with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
tags=["example", "gcs", "gdrive"],
) as dag:

@task
def create_temp_gcp_connection():
conn = Connection(
conn_id=CONNECTION_ID,
conn_type="google_cloud_platform",
)
conn_extra_json = json.dumps(
{
"scope": "https://www.googleapis.com/auth/drive,"
"https://www.googleapis.com/auth/cloud-platform"
}
)
conn.set_extra(conn_extra_json)

session: Session = Session()
if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first():
log.warning("Connection %s already exists", CONNECTION_ID)
return None
session.add(conn)
session.commit()

create_temp_gcp_connection_task = create_temp_gcp_connection()

create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)
Expand All @@ -62,41 +97,62 @@

copy_single_file = GCSToGoogleDriveOperator(
task_id="copy_single_file",
gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=FILE_NAME,
destination_object=FILE_NAME,
destination_object=DRIVE_FILE_NAME,
)

# [START detect_file]
detect_file = GoogleDriveFileExistenceSensor(
task_id="detect_file", folder_id=FOLDER_ID, file_name=FILE_NAME
task_id="detect_file",
folder_id=FOLDER_ID,
file_name=DRIVE_FILE_NAME,
gcp_conn_id=CONNECTION_ID,
)
# [END detect_file]

# [START upload_gdrive_to_gcs]
upload_gdrive_to_gcs = GoogleDriveToGCSOperator(
task_id="upload_gdrive_object_to_gcs",
gcp_conn_id=CONNECTION_ID,
folder_id=FOLDER_ID,
file_name=FILE_NAME,
file_name=DRIVE_FILE_NAME,
bucket_name=BUCKET_NAME,
object_name=OBJECT,
)
# [END upload_gdrive_to_gcs]

@task(trigger_rule=TriggerRule.ALL_DONE)
def remove_files_from_drive():
service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
response = service.files().list(q=f"name = '{DRIVE_FILE_NAME}'").execute()
if files := response["files"]:
file = files[0]
log.info("Deleting file {}...", file)
service.files().delete(fileId=file["id"])
log.info("Done.")

remove_files_from_drive_task = remove_files_from_drive()

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

delete_temp_gcp_connection_task = BashOperator(
task_id="delete_temp_gcp_connection",
bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)

(
# TEST SETUP
create_bucket
>> upload_file
>> copy_single_file
[create_bucket >> upload_file >> copy_single_file, create_temp_gcp_connection_task]
# TEST BODY
>> detect_file
>> upload_gdrive_to_gcs
# TEST TEARDOWN
>> delete_bucket
>> remove_files_from_drive_task
>> [delete_bucket, delete_temp_gcp_connection_task]
)

from tests.system.utils.watcher import watcher
Expand Down
Loading