Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions airflow/providers/google/suite/hooks/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def get_conn(self) -> Any:
self._conn = build("drive", self.api_version, http=http_authorized, cache_discovery=False)
return self._conn

def _ensure_folders_exists(self, path: str) -> str:
def _ensure_folders_exists(self, path: str, folder_id: str) -> str:
service = self.get_conn()
current_parent = "root"
current_parent = folder_id
folders = path.split("/")
depth = 0
# First tries to enter directories
Expand All @@ -87,7 +87,13 @@ def _ensure_folders_exists(self, path: str) -> str:
]
result = (
service.files()
.list(q=" and ".join(conditions), spaces="drive", fields="files(id, name)")
.list(
q=" and ".join(conditions),
spaces="drive",
fields="files(id, name)",
includeItemsFromAllDrives=True,
supportsAllDrives=True,
)
.execute(num_retries=self.num_retries)
)
files = result.get("files", [])
Expand All @@ -109,7 +115,11 @@ def _ensure_folders_exists(self, path: str) -> str:
}
file = (
service.files()
.create(body=file_metadata, fields="id")
.create(
body=file_metadata,
fields="id",
supportsAllDrives=True,
)
.execute(num_retries=self.num_retries)
)
self.log.info("Created %s directory", current_folder)
Expand Down Expand Up @@ -201,6 +211,7 @@ def upload_file(
remote_location: str,
chunk_size: int = 100 * 1024 * 1024,
resumable: bool = False,
folder_id: str = "root",
) -> str:
"""
Uploads a file that is available locally to a Google Drive service.
Expand All @@ -214,14 +225,15 @@ def upload_file(
or to -1.
:param resumable: True if this is a resumable upload. False means upload
in a single request.
:param folder_id: The base/root folder id for remote_location (part of the drive url of a folder).
:return: File ID
"""
service = self.get_conn()
directory_path, _, file_name = remote_location.rpartition("/")
if directory_path:
parent = self._ensure_folders_exists(directory_path)
parent = self._ensure_folders_exists(path=directory_path, folder_id=folder_id)
else:
parent = "root"
parent = folder_id

file_metadata = {"name": file_name, "parents": [parent]}
media = MediaFileUpload(local_location, chunksize=chunk_size, resumable=resumable)
Expand Down