Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better cancellation of Zarr uploads #1041

Merged
merged 2 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions dandi/cli/cmd_service_scripts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

from pathlib import Path
from time import sleep

import click

from .base import instance_option, lgr, map_to_click_exceptions
from ..consts import dandiset_metadata_file
from ..dandiapi import DandiAPIClient, RemoteZarrAsset
from ..dandiset import Dandiset
from ..exceptions import NotFoundError


@click.group()
def service_scripts() -> None:
"""Various utility operations"""
pass


@service_scripts.command()
@instance_option()
@click.argument("paths", nargs=-1)
@map_to_click_exceptions
def cancel_zarr_upload(paths: tuple[str, ...], dandi_instance: str) -> None:
"""
Cancel an in-progress Zarr upload operation on the server.

If a process uploading a Zarr is suddenly interrupted or killed, the server
might not be properly notified. If a later attempt is made to upload the
same Zarr, the server will then report back that there is already an upload
operation in progress and prohibit the new upload. Use this command in
such a case to tell the server to cancel the old upload operations for the
Zarrs at the given path(s).
"""

cwd = Path.cwd()
dandiset = Dandiset.find(cwd)
if dandiset is None:
raise RuntimeError(
f"Found no {dandiset_metadata_file} anywhere."
" Use 'dandi download' or 'organize' first"
)
pathbase = cwd.relative_to(dandiset.path)

with DandiAPIClient.for_dandi_instance(dandi_instance, authenticate=True) as client:
d = client.get_dandiset(dandiset.identifier)
for p in paths:
asset_path = (pathbase / p).as_posix()
try:
asset = d.get_asset_by_path(asset_path)
except NotFoundError:
lgr.warning("No such asset: %s", asset_path)
continue
if not isinstance(asset, RemoteZarrAsset):
lgr.warning("Not a Zarr: %s", asset_path)
continue
r = client.get(f"/zarr/{asset.zarr}/")
if not r["upload_in_progress"]:
lgr.info("No upload in progress for Zarr at %s", asset_path)
continue
lgr.info("Cancelling in-progress upload for Zarr at %s ...", asset_path)
client.delete(f"/zarr/{asset.zarr}/upload/")
while True:
sleep(0.5)
r = client.get(f"/zarr/{asset.zarr}/")
if not r["upload_in_progress"]:
lgr.info("Upload cancelled")
break
2 changes: 2 additions & 0 deletions dandi/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def main(ctx, log_level, pdb=False):
from .cmd_ls import ls # noqa: E402
from .cmd_move import move # noqa: E402
from .cmd_organize import organize # noqa: E402
from .cmd_service_scripts import service_scripts # noqa: E402
from .cmd_shell_completion import shell_completion # noqa: E402
from .cmd_upload import upload # noqa: E402
from .cmd_validate import validate, validate_bids # noqa: E402
Expand All @@ -157,6 +158,7 @@ def main(ctx, log_level, pdb=False):
ls,
move,
organize,
service_scripts,
shell_completion,
upload,
validate,
Expand Down
43 changes: 43 additions & 0 deletions dandi/cli/tests/test_service_scripts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from click.testing import CliRunner
import pytest

from dandi.tests.fixtures import SampleDandiset

from ..command import service_scripts


def test_cancel_zarr_upload(
monkeypatch: pytest.MonkeyPatch, new_dandiset: SampleDandiset
) -> None:
client = new_dandiset.client
asset_path = "foo/bar/baz.zarr"
r = client.post(
"/zarr/", json={"name": asset_path, "dandiset": new_dandiset.dandiset_id}
)
zarr_id = r["zarr_id"]
client.post(
f"{new_dandiset.dandiset.version_api_path}assets/",
json={"metadata": {"path": asset_path}, "zarr_id": zarr_id},
)
client.post(
f"/zarr/{zarr_id}/upload/",
json=[
{"path": "0.dat", "etag": "0" * 32},
{"path": "1.dat", "etag": "1" * 32},
],
)
r = client.get(f"/zarr/{zarr_id}/")
assert r["upload_in_progress"] is True

(new_dandiset.dspath / "foo").mkdir()
monkeypatch.chdir(new_dandiset.dspath / "foo")
monkeypatch.setenv("DANDI_API_KEY", new_dandiset.api.api_key)

r = CliRunner().invoke(
service_scripts,
["cancel-zarr-upload", "-i", new_dandiset.api.instance_id, "bar/baz.zarr"],
)
assert r.exit_code == 0

r = client.get(f"/zarr/{zarr_id}/")
assert r["upload_in_progress"] is False
28 changes: 28 additions & 0 deletions dandi/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
import atexit
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from contextlib import closing
Expand All @@ -34,6 +35,7 @@
Optional,
Tuple,
Union,
cast,
)
from xml.etree.ElementTree import fromstring

Expand Down Expand Up @@ -926,6 +928,11 @@ def iter_upload(
yield {"status": "initiating upload", "size": total_size}
lgr.debug("%s: Beginning upload", asset_path)
bytes_uploaded = 0
upload_data = (
zarr_id,
client.get_url(f"/zarr/{zarr_id}/upload"),
cast(Optional[str], client.session.headers.get("Authorization")),
)
with RESTFullAPIClient(
"http://nil.nil",
headers={"X-Amz-ACL": "bucket-owner-full-control"},
Expand All @@ -940,6 +947,7 @@ def iter_upload(
pluralize(len(upload_body), "file"),
)
r = client.post(f"/zarr/{zarr_id}/upload/", json=upload_body)
ZARR_UPLOADS_IN_PROGRESS.add(upload_data)
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
futures = [
executor.submit(
Expand Down Expand Up @@ -972,6 +980,7 @@ def iter_upload(
}
lgr.debug("%s: Completing upload of batch #%d", asset_path, i)
client.post(f"/zarr/{zarr_id}/upload/complete/")
ZARR_UPLOADS_IN_PROGRESS.discard(upload_data)
lgr.debug("%s: All files uploaded", asset_path)
old_zarr_files = [e for e in old_zarr_entries.values() if e.is_file()]
if old_zarr_files:
Expand Down Expand Up @@ -1242,3 +1251,22 @@ def _cmp_digests(
else:
lgr.debug("%s: File %s already on server; skipping", asset_path, local_entry)
return None


# Collection of (zarr ID, upload endpoint URL, auth header value) tuples
ZARR_UPLOADS_IN_PROGRESS: set[tuple[str, str, Optional[str]]] = set()


@atexit.register
def cancel_zarr_uploads() -> None:
for zarr_id, url, auth in ZARR_UPLOADS_IN_PROGRESS:
lgr.debug("Cancelling upload for Zarr %s", zarr_id)
headers = {"Authorization": auth} if auth is not None else {}
r = requests.delete(url, headers=headers)
if not r.ok:
lgr.warning(
"Upload cancellation failed with %d: %s: %s",
r.status_code,
r.reason,
r.text,
)
32 changes: 32 additions & 0 deletions docs/source/cmdline/service-scripts.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
:program:`dandi service-scripts`
================================

::

dandi [<global options>] service-scripts [<command>]

:program:`dandi service-scripts` is a collection of subcommands for various
utility operations.

``cancel-zarr-upload``
----------------------

::

dandi [<global options>] service-scripts cancel-zarr-upload [<options>] <path> ...

Cancel an in-progress Zarr upload operation on the server.

If a process uploading a Zarr is suddenly interrupted or killed, the server
might not be properly notified. If a later attempt is made to upload the same
Zarr, the server will then report back that there is already an upload
operation in progress and prohibit the new upload. Use this command in such a
case to tell the server to cancel the old upload operations for the Zarrs at
the given path(s).

Options
^^^^^^^

.. option:: -i, --dandi-instance <instance-name>

DANDI instance on which to cancel uploads [default: ``dandi``]