Skip to content

Commit

Permalink
Fail a transfer when globus task has faults (#136)
Browse files Browse the repository at this point in the history
* outgoing transfers failed

* queue item fails

* formatting

* test running again

* linting

* faults can be recoverable errors

* Cancel a task if a specific type of event occurs

* isorting

* fixing constant name

---------

Co-authored-by: Ioannis Paraskevakos <ip8725@pulibrarian2.princeton.edu>
  • Loading branch information
iparask and Ioannis Paraskevakos authored Mar 5, 2025
1 parent d5a03ae commit 657cff3
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 3 deletions.
7 changes: 7 additions & 0 deletions hera_librarian/async_transfers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,10 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
to set (e.g.) internal flags as required.
"""
raise NotImplementedError

@abc.abstractmethod
def fail_transfer(self, settings: "ServerSettings") -> bool:
"""
Fail the current transfer.
"""
raise NotImplementedError
51 changes: 50 additions & 1 deletion hera_librarian/async_transfers/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import globus_sdk

from hera_librarian.transfer import TransferStatus
from hera_librarian.utils import GLOBUS_ERROR_EVENTS

from .core import CoreAsyncTransferManager

Expand Down Expand Up @@ -148,6 +149,8 @@ def _get_transfer_data(self, label: str, settings: "ServerSettings"):
verify_checksum=True, # We do this ourselves, but globus will auto-retry if it detects failed files
preserve_timestamp=True,
notify_on_succeeded=False,
skip_source_errors=False,
fail_on_quota_errors=True,
)

return transfer_data
Expand Down Expand Up @@ -273,7 +276,9 @@ def batch_transfer(
# Globus transfer.
relative_local_path = self._subtract_local_root(local_path, settings)
transfer_data.add_item(
str(relative_local_path), str(remote_path), recursive=local_path.is_dir()
str(relative_local_path),
str(remote_path),
recursive=local_path.is_dir(),
)

# submit the transfer
Expand Down Expand Up @@ -328,5 +333,49 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
return TransferStatus.COMPLETED
elif task_doc["status"] == "FAILED":
return TransferStatus.FAILED
# When there are errors, better fail the task and try again. There is
# a different check for faults to make the state transition as clear as
# possible.
elif task_doc["faults"] > 0:
task_event_list = transfer_client.task_event_list(self.task_id)
for event in task_event_list:
if event["code"] in GLOBUS_ERROR_EVENTS and event["is_error"]:
return TransferStatus.FAILED
return TransferStatus.FAILED
else: # "status" == "ACTIVE"
return TransferStatus.INITIATED

def fail_transfer(self, settings: "ServerSettings") -> bool:
"""
A GLobus task needs to be canceled because it has errors.
Parameters
----------
settings : ServerSettings object
The settings for the Librarian server. These settings should include
the Globus login information.
Returns
-------
bool
Whether we could successfully cancelled a transfer (True) or not
(False).
"""
authorizer = self.authorize(settings=settings)
if authorizer is None:
# We *should* be able to just assume that we have already
# authenticated and should be able to query the status of our
# transfer. However, if for whatever reason we're not able to talk
# to Globus (network issues, Globus outage, etc.), we won't be able
# to find out our transfer's status -- let's bail and assume we
# failed
return False

transfer_client = globus_sdk.TransferClient(authorizer=authorizer)

try:
_ = transfer_client.cancel_task(self.task_id)
except globus_sdk.TransferAPIError as e:
return False
return True
3 changes: 3 additions & 0 deletions hera_librarian/async_transfers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,6 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
return TransferStatus.INITIATED
else:
return TransferStatus.FAILED

def fail_transfer(self, settings: "ServerSettings") -> bool:
return True
3 changes: 3 additions & 0 deletions hera_librarian/async_transfers/rsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
return TransferStatus.INITIATED
else:
return TransferStatus.FAILED

def fail_transfer(self, settings: "ServerSettings") -> bool:
return True
18 changes: 18 additions & 0 deletions hera_librarian/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,21 @@ def get_size_from_path(path):
size += os.path.getsize(dirname + "/" + f)

return size


# --- Globus Events ---
GLOBUS_ERROR_EVENTS = [
"AMBIGUOUS_PATH",
"IS_A_DIRECTORY",
"EXPIRED",
"FILE_NOT_FOUND",
"FILE_SIZE_CHANGED",
"INVALID_PATH_NAME",
"INVALID_SERVICE_CREDENTIAL",
"INVALID_SYMLINK",
"LIMIT_EXCEEDED",
"NO_CREDENTIALS",
"NO_SPACE_LEFT",
"PERMISSION_DENIED",
"QUOTA_EXCEEDED",
]
4 changes: 2 additions & 2 deletions librarian_background/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ def check_on_consumed(
)
elif current_status == TransferStatus.FAILED:
logger.info("Transfer for {q.id} has failed", q=queue_item)
for transfer in queue_item.transfers:
transfer.fail_transfer(session=session, commit=False)
queue_item.async_transfer_manager.fail_transfer(server_settings)
queue_item.fail(session=session)
else:
logger.error(
"Incompatible return value for transfer status from "
Expand Down
3 changes: 3 additions & 0 deletions tests/background_unit_test/test_send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def valid(self, *args, **kwargs):
def transfer_status(self, *args, **kwargs):
return self.complete_transfer_status

def fail_transfer(self, settings):
return True


def test_create_queue_item(test_server_with_valid_file, test_orm):
"""
Expand Down
3 changes: 3 additions & 0 deletions tests/integration_test/test_send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def valid(self, *args, **kwargs):
def transfer_status(self, *args, **kwargs):
return self.complete_transfer_status

def fail_transfer(self, settings):
return True


def test_create_simple_queue_item_and_send(
test_server, test_orm, mocked_admin_client, server
Expand Down

0 comments on commit 657cff3

Please sign in to comment.