Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion metaflow/plugins/datatools/s3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,9 @@ def _url(self, key_value):
"Don't use absolute S3 URLs when the S3 client is "
"initialized with a prefix. URL: %s" % key
)
return os.path.join(self._s3root, key)
# Strip leading slashes to ensure os.path.join works correctly
# os.path.join discards the first argument if the second starts with '/'
return os.path.join(self._s3root, key.lstrip("/"))
else:
return self._s3root

Expand Down
3 changes: 2 additions & 1 deletion metaflow/plugins/datatools/s3/s3op.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,8 @@ def list_prefix(self, prefix_url, delimiter=""):
key_path = key["Key"]
# filter out sibling directories that share the same prefix
if delimiter == "": # recursive mode
normalized_prefix = prefix_url.path
# S3 keys never start with /, so strip any leading / from prefix
normalized_prefix = prefix_url.path.lstrip("/")
if not normalized_prefix.endswith("/"):
normalized_prefix += "/"
# Only include keys that are actually under our directory
Expand Down
60 changes: 60 additions & 0 deletions test/data/s3/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest
from metaflow import current
from .. import S3ROOT

# S3ROOT variants for testing both with and without trailing slash
S3ROOT_VARIANTS = [S3ROOT.rstrip("/"), S3ROOT if S3ROOT.endswith("/") else S3ROOT + "/"]


@pytest.fixture(params=S3ROOT_VARIANTS, ids=["no_slash", "with_slash"])
def s3root(request):
"""
Fixture that provides S3ROOT with and without trailing slash.

This ensures tests work correctly regardless of whether the s3root
has a trailing slash or not.
"""
return request.param


@pytest.fixture(autouse=True)
def reset_current_env():
"""
Fixture to ensure the metaflow current environment is clean between tests.

This prevents test pollution when tests manipulate the global current state.
The fixture runs automatically for all tests in this directory.
"""
# Setup: Save all private attributes that might be set by current._set_env
saved_state = {
attr: getattr(current, attr, None)
for attr in dir(current)
if attr.startswith("_") and not attr.startswith("__")
}

yield

# Teardown: Clear all current environment attributes
# First, remove any new attributes that were added
for attr in dir(current):
if (
attr.startswith("_")
and not attr.startswith("__")
and attr not in saved_state
):
try:
delattr(current, attr)
except AttributeError:
pass # Some attributes may be read-only

# Then restore original values
for attr, value in saved_state.items():
try:
if value is None:
# Remove attribute if it didn't exist before
if hasattr(current, attr):
delattr(current, attr)
else:
setattr(current, attr, value)
except AttributeError:
pass # Some attributes may be read-only
64 changes: 32 additions & 32 deletions test/data/s3/s3_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def data(self):

@property
def url(self):
"""Returns the full S3 URL including the S3ROOT prefix."""
return os.path.join(S3ROOT, self.key)


Expand All @@ -205,13 +206,13 @@ def _format_test_cases(dataset, meta=None, ranges=None):
ids = []
for prefix, filespecs in dataset:
objs = [RandomFile(prefix, fname, size) for fname, size in filespecs.items()]
objs = {obj.url: (obj, None, None) for obj in objs}
objs = {obj.key: (obj, None, None) for obj in objs}
if meta:
# We generate one per meta info
for metaname, (content_type, usermeta) in meta.items():
objs.update(
{
"%s_%s" % (obj.url, metaname): (obj, content_type, usermeta)
"%s_%s" % (obj.key, metaname): (obj, content_type, usermeta)
for (obj, _, _) in objs.values()
}
)
Expand Down Expand Up @@ -250,7 +251,7 @@ def _format_test_cases(dataset, meta=None, ranges=None):
)

ids.append(prefix)
cases.append((S3ROOT, [prefix], files))
cases.append(([prefix], files))
return cases, ids


Expand Down Expand Up @@ -315,12 +316,11 @@ def pytest_benchmark_many_case():
id_name = "%ds_%dm_%dl" % (small_count, medium_count, large_count)
cases.append(
(
S3ROOT,
[],
[
(small_count, small_case[2]),
(medium_count, medium_case[2]),
(large_count, large_case[2]),
(small_count, small_case[1]),
(medium_count, medium_case[1]),
(large_count, large_case[1]),
],
)
)
Expand All @@ -329,7 +329,6 @@ def pytest_benchmark_many_case():


def pytest_benchmark_put_case():
put_prefix = os.path.join(S3ROOT, PUT_PREFIX)
cases = []
ids = []
for prefix, filespecs in [
Expand All @@ -340,18 +339,17 @@ def pytest_benchmark_put_case():
blobs = []
for fname, size in filespecs.items():
blobs.append((prefix, fname, size))
cases.append((put_prefix, blobs, None))
cases.append((blobs, None))
ids = ["5gb", "1mb", "1b"]
return {"argvalues": cases, "ids": ids}


def pytest_benchmark_put_many_case():
single_cases_and_ids = pytest_benchmark_put_case()
single_cases = single_cases_and_ids["argvalues"]
large_blob = single_cases[0][1][0]
medium_blob = single_cases[1][1][0]
small_blob = single_cases[2][1][0]
put_prefix = os.path.join(S3ROOT, PUT_PREFIX)
large_blob = single_cases[0][0][0]
medium_blob = single_cases[1][0][0]
small_blob = single_cases[2][0][0]
# Configuration: we will form groups of up to BENCHMARK_*_ITER_MAX items
# (count taken from iteration_count). We will also form groups taking from
# all three sets
Expand All @@ -376,7 +374,7 @@ def pytest_benchmark_put_many_case():
(medium_count, medium_blob),
(large_count, large_blob),
]
cases.append((put_prefix, blobs, None))
cases.append((blobs, None))
ids.append(id_name)
return {"argvalues": cases, "ids": ids}

Expand All @@ -385,17 +383,16 @@ def pytest_many_prefixes_case():
cases, ids = _format_test_cases(BASIC_DATA, meta=BASIC_METADATA)
many_prefixes = []
many_prefixes_expected = {}
for s3root, [prefix], files in cases:
for [prefix], files in cases:
many_prefixes.append(prefix)
many_prefixes_expected.update(files)
# add many prefixes cases
ids.append("many_prefixes")
cases.append((S3ROOT, many_prefixes, many_prefixes_expected))
cases.append((many_prefixes, many_prefixes_expected))
return {"argvalues": cases, "ids": ids}


def pytest_put_strings_case(meta=None):
put_prefix = os.path.join(S3ROOT, PUT_PREFIX)
data = [
"unicode: \u523a\u8eab means sashimi",
b"bytes: \x00\x01\x02",
Expand All @@ -406,8 +403,9 @@ def pytest_put_strings_case(meta=None):
for text in data:
blob = to_bytes(text)
checksum = sha1(blob).hexdigest()
key = str(uuid4())
expected[os.path.join(put_prefix, key)] = {
# Include PUT_PREFIX in the key so tests don't need to handle it
key = os.path.join(PUT_PREFIX, str(uuid4()))
expected[key] = {
None: ExpectedResult(
size=len(blob),
checksum=checksum,
Expand All @@ -419,8 +417,8 @@ def pytest_put_strings_case(meta=None):
objs.append((key, text))
if meta is not None:
for content_type, usermeta in meta.values():
key = str(uuid4())
expected[os.path.join(put_prefix, key)] = {
key = os.path.join(PUT_PREFIX, str(uuid4()))
expected[key] = {
None: ExpectedResult(
size=len(blob),
checksum=checksum,
Expand All @@ -437,11 +435,10 @@ def pytest_put_strings_case(meta=None):
metadata=usermeta,
)
)
return {"argvalues": [(put_prefix, objs, expected)], "ids": ["put_strings"]}
return {"argvalues": [(objs, expected)], "ids": ["put_strings"]}


def pytest_put_blobs_case(meta=None):
put_prefix = os.path.join(S3ROOT, PUT_PREFIX)
cases = []
ids = []
for prefix, filespecs in BIG_DATA:
Expand All @@ -450,8 +447,9 @@ def pytest_put_blobs_case(meta=None):
for fname, size in filespecs.items():
blob = RandomFile(prefix, fname, size)
checksum = blob.checksum()
key = str(uuid4())
expected[os.path.join(put_prefix, key)] = {
# Include PUT_PREFIX in the key so tests don't need to handle it
key = os.path.join(PUT_PREFIX, str(uuid4()))
expected[key] = {
None: ExpectedResult(
size=blob.size,
checksum=checksum,
Expand All @@ -463,8 +461,8 @@ def pytest_put_blobs_case(meta=None):
blobs.append((key, blob.data))
if meta is not None:
for content_type, usermeta in meta.values():
key = str(uuid4())
expected[os.path.join(put_prefix, key)] = {
key = os.path.join(PUT_PREFIX, str(uuid4()))
expected[key] = {
None: ExpectedResult(
size=len(blob),
checksum=checksum,
Expand All @@ -482,7 +480,7 @@ def pytest_put_blobs_case(meta=None):
)
)
ids.append(prefix)
cases.append((put_prefix, blobs, expected))
cases.append((blobs, expected))
return {"argvalues": cases, "ids": ids}


Expand Down Expand Up @@ -522,11 +520,12 @@ def _do_upload(prefix, filespecs, meta=None):
print("Test data case %s: uploaded to %s" % (prefix, f.url))
if meta is not None:
for metaname, metainfo in meta.items():
new_url = "%s_%s" % (f.url, metaname)
url = urlparse(new_url)
new_key = "%s_%s" % (f.key, metaname)
full_new_url = os.path.join(S3ROOT, new_key)
url = urlparse(full_new_url)
print(
"Test data case %s: upload to %s started"
% (prefix, new_url)
% (prefix, full_new_url)
)
extra = {}
content_type, user_meta = metainfo
Expand All @@ -544,7 +543,8 @@ def _do_upload(prefix, filespecs, meta=None):
ExtraArgs=extra,
)
print(
"Test data case %s: uploaded to %s" % (prefix, new_url)
"Test data case %s: uploaded to %s"
% (prefix, full_new_url)
)

for prefix, filespecs in BIG_DATA + FAKE_RUN_DATA:
Expand Down
Loading
Loading