Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions airflow/providers/google/suite/hooks/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def get_conn(self) -> Any:
self._conn = build("drive", self.api_version, http=http_authorized, cache_discovery=False)
return self._conn

def _ensure_folders_exists(self, path: str) -> str:
def _ensure_folders_exists(self, path: str, folder_id: str) -> str:
service = self.get_conn()
current_parent = "root"
current_parent = folder_id
folders = path.split("/")
depth = 0
# First tries to enter directories
Expand All @@ -88,7 +88,13 @@ def _ensure_folders_exists(self, path: str) -> str:
]
result = (
service.files()
.list(q=" and ".join(conditions), spaces="drive", fields="files(id, name)")
.list(
q=" and ".join(conditions),
spaces="drive",
fields="files(id, name)",
includeItemsFromAllDrives=True,
supportsAllDrives=True,
)
.execute(num_retries=self.num_retries)
)
files = result.get("files", [])
Expand All @@ -110,7 +116,11 @@ def _ensure_folders_exists(self, path: str) -> str:
}
file = (
service.files()
.create(body=file_metadata, fields="id")
.create(
body=file_metadata,
fields="id",
supportsAllDrives=True,
)
.execute(num_retries=self.num_retries)
)
self.log.info("Created %s directory", current_folder)
Expand Down Expand Up @@ -202,6 +212,7 @@ def upload_file(
remote_location: str,
chunk_size: int = 100 * 1024 * 1024,
resumable: bool = False,
folder_id: str = "root",
) -> str:
"""
Uploads a file that is available locally to a Google Drive service.
Expand All @@ -215,14 +226,15 @@ def upload_file(
or to -1.
:param resumable: True if this is a resumable upload. False means upload
in a single request.
:param folder_id: The base/root folder id for remote_location (part of the drive URL of a folder).
:return: File ID
"""
service = self.get_conn()
directory_path, _, file_name = remote_location.rpartition("/")
if directory_path:
parent = self._ensure_folders_exists(directory_path)
parent = self._ensure_folders_exists(path=directory_path, folder_id=folder_id)
else:
parent = "root"
parent = folder_id

file_metadata = {"name": file_name, "parents": [parent]}
media = MediaFileUpload(local_location, chunksize=chunk_size, resumable=resumable)
Expand Down
4 changes: 4 additions & 0 deletions airflow/providers/google/suite/transfers/local_to_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account
:param folder_id: The base/root folder id for each local path in the Drive folder
:return: Remote file ids after upload
"""

Expand All @@ -82,6 +83,7 @@ def __init__(
resumable: bool = False,
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
folder_id: str = "root",
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -94,6 +96,7 @@ def __init__(
self.resumable = resumable
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
self.folder_id = folder_id

def execute(self, context: Context) -> list[str]:
hook = GoogleDriveHook(
Expand All @@ -113,6 +116,7 @@ def execute(self, context: Context) -> list[str]:
remote_location=str(Path(self.drive_folder) / Path(local_path).name),
chunk_size=self.chunk_size,
resumable=self.resumable,
folder_id=self.folder_id,
)

remote_file_ids.append(remote_file_id)
Expand Down
26 changes: 19 additions & 7 deletions tests/providers/google/suite/hooks/test_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,21 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
{"id": "ID_4"},
]

result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
result_value = self.gdrive_hook._ensure_folders_exists(path="AAA/BBB/CCC/DDD", folder_id="root")

mock_get_conn.assert_has_calls(
[
mock.call()
.files()
.list(
fields="files(id, name)",
includeItemsFromAllDrives=True,
q=(
"trashed=false and mimeType='application/vnd.google-apps.folder' "
"and name='AAA' and 'root' in parents"
),
spaces="drive",
fields="files(id, name)",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -76,6 +78,7 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
"parents": ["root"],
},
fields="id",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -86,6 +89,7 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
"parents": ["ID_1"],
},
fields="id",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -96,6 +100,7 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
"parents": ["ID_2"],
},
fields="id",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -106,6 +111,7 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
"parents": ["ID_3"],
},
fields="id",
supportsAllDrives=True,
),
],
any_order=True,
Expand All @@ -125,20 +131,22 @@ def test_ensure_folders_exists_when_some_folders_exists(self, mock_get_conn):
{"id": "ID_4"},
]

result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
result_value = self.gdrive_hook._ensure_folders_exists(path="AAA/BBB/CCC/DDD", folder_id="root")

mock_get_conn.assert_has_calls(
[
*[
mock.call()
.files()
.list(
fields="files(id, name)",
includeItemsFromAllDrives=True,
q=(
"trashed=false and mimeType='application/vnd.google-apps.folder' "
f"and name='{d}' and '{key}' in parents"
),
spaces="drive",
fields="files(id, name)",
supportsAllDrives=True,
)
for d, key in [("AAA", "root"), ("BBB", "ID_1"), ("CCC", "ID_2")]
],
Expand All @@ -151,6 +159,7 @@ def test_ensure_folders_exists_when_some_folders_exists(self, mock_get_conn):
"parents": ["ID_2"],
},
fields="id",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -161,6 +170,7 @@ def test_ensure_folders_exists_when_some_folders_exists(self, mock_get_conn):
"parents": ["ID_3"],
},
fields="id",
supportsAllDrives=True,
),
],
any_order=True,
Expand All @@ -177,20 +187,22 @@ def test_ensure_folders_exists_when_all_folders_exists(self, mock_get_conn):
{"files": [{"id": "ID_4"}]},
]

result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
result_value = self.gdrive_hook._ensure_folders_exists(path="AAA/BBB/CCC/DDD", folder_id="root")

mock_get_conn.assert_has_calls(
[
*[
mock.call()
.files()
.list(
fields="files(id, name)",
includeItemsFromAllDrives=True,
q=(
"trashed=false and mimeType='application/vnd.google-apps.folder' "
f"and name='{d}' and '{key}' in parents"
),
spaces="drive",
fields="files(id, name)",
supportsAllDrives=True,
)
for d, key in [("AAA", "root"), ("BBB", "ID_1"), ("CCC", "ID_2"), ("DDD", "ID_3")]
],
Expand Down Expand Up @@ -327,7 +339,7 @@ def test_upload_file_to_subdirectory(

return_value = self.gdrive_hook.upload_file("local_path", "AA/BB/CC/remote_path")

mock_ensure_folders_exists.assert_called_once_with("AA/BB/CC")
mock_ensure_folders_exists.assert_called_once_with(path="AA/BB/CC", folder_id="root")
mock_get_conn.assert_has_calls(
[
mock.call()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ def test_execute(self, mock_hook):
context = {}
mock_hook.return_value.upload_file.return_value = REMOTE_FILE_IDS
op = LocalFilesystemToGoogleDriveOperator(
task_id="test_task", local_paths=LOCAL_PATHS, drive_folder=DRIVE_FOLDER, gcp_conn_id=GCP_CONN_ID
task_id="test_task",
local_paths=LOCAL_PATHS,
drive_folder=DRIVE_FOLDER,
gcp_conn_id=GCP_CONN_ID,
folder_id="some_folder_id",
)
op.execute(context)

Expand All @@ -43,12 +47,14 @@ def test_execute(self, mock_hook):
remote_location="test_folder/test1",
chunk_size=100 * 1024 * 1024,
resumable=False,
folder_id="some_folder_id",
),
mock.call(
local_location="test2",
remote_location="test_folder/test2",
chunk_size=100 * 1024 * 1024,
resumable=False,
folder_id="some_folder_id",
),
]
mock_hook.return_value.upload_file.assert_has_calls(calls)