Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In FlyteRemote.upload_file, pass the file object directly rather than the entire bytes buffer #2641

Merged
merged 19 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ jobs:
matrix:
os: [ubuntu-latest]
python-version: ${{fromJson(needs.detect-python-versions.outputs.python-versions)}}
makefile-cmd: [integration_test_codecov, integration_test_lftransfers_codecov]
steps:
# As described in https://github.com/pypa/setuptools_scm/issues/414, SCM needs git history
# and tags to work.
Expand Down Expand Up @@ -297,7 +298,7 @@ jobs:
FLYTEKIT_CI: 1
PYTEST_OPTS: -n2
run: |
make integration_test_codecov
make ${{ matrix.makefile-cmd }}
- name: Codecov
uses: codecov/codecov-action@v3.1.0
with:
Expand Down
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,15 @@ integration_test_codecov:

.PHONY: integration_test
integration_test:
$(PYTEST_AND_OPTS) tests/flytekit/integration ${CODECOV_OPTS}
$(PYTEST_AND_OPTS) tests/flytekit/integration ${CODECOV_OPTS} -m "not lftransfers"

.PHONY: integration_test_lftransfers_codecov
integration_test_lftransfers_codecov:
$(MAKE) CODECOV_OPTS="--cov=./ --cov-report=xml --cov-append" integration_test_lftransfers

.PHONY: integration_test_lftransfers
integration_test_lftransfers:
$(PYTEST) tests/flytekit/integration ${CODECOV_OPTS} -m "lftransfers"

doc-requirements.txt: export CUSTOM_COMPILE_COMMAND := make doc-requirements.txt
doc-requirements.txt: doc-requirements.in install-piptools
Expand Down
8 changes: 4 additions & 4 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,14 @@
extra_headers = self.get_extra_headers_for_protocol(upload_location.native_url)
extra_headers.update(upload_location.headers)
encoded_md5 = b64encode(md5_bytes)
with open(str(to_upload), "+rb") as local_file:
content = local_file.read()
content_length = len(content)
local_file_path = str(to_upload)
content_length = os.stat(local_file_path).st_size

Check warning on line 905 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L904-L905

Added lines #L904 - L905 were not covered by tests
with open(local_file_path, "+rb") as local_file:
headers = {"Content-Length": str(content_length), "Content-MD5": encoded_md5}
headers.update(extra_headers)
rsp = requests.put(
upload_location.signed_url,
data=content,
data=local_file, # NOTE: We pass the file object directly to stream our upload.
headers=headers,
verify=False
if self._config.platform.insecure_skip_verify is True
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ markers = [
"sandbox_test: fake integration tests", # unit tests that are really integration tests that run on a sandbox environment
"serial: tests to avoid using with pytest-xdist",
"hypothesis: tests that use they hypothesis library",
"lftransfers: integration tests which involve large file transfers"
]

[tool.coverage.report]
Expand Down
95 changes: 95 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import botocore.session
from contextlib import ExitStack, contextmanager
import datetime
import hashlib
import json
import os
import pathlib
import subprocess
import tempfile
import time
import typing

import joblib
from urllib.parse import urlparse
import uuid
import pytest

from flytekit import LaunchPlan, kwtypes
Expand Down Expand Up @@ -483,3 +489,92 @@ def test_execute_workflow_with_maptask(register):
wait=True,
)
assert execution.outputs["o0"] == [4, 5, 6]

@pytest.mark.lftransfers
class TestLargeFileTransfers:
"""A class to capture tests and helper functions for large file transfers."""

@staticmethod
def _get_minio_s3_client(remote):
minio_s3_config = remote.file_access.data_config.s3
sess = botocore.session.get_session()
return sess.create_client(
"s3",
endpoint_url=minio_s3_config.endpoint,
aws_access_key_id=minio_s3_config.access_key_id,
aws_secret_access_key=minio_s3_config.secret_access_key,
)

@staticmethod
def _get_s3_file_md5_bytes(s3_client, bucket, key):
md5_hash = hashlib.md5()
response = s3_client.get_object(Bucket=bucket, Key=key)
body = response['Body']
# Read the object in chunks and update the hash (this keeps memory usage low)
for chunk in iter(lambda: body.read(4096), b''):
md5_hash.update(chunk)
return md5_hash.digest()

@staticmethod
def _delete_s3_file(s3_client, bucket, key):
# Delete the object
response = s3_client.delete_object(Bucket=bucket, Key=key)
# Ensure the object was deleted - for 'delete_object' 204 is the expected successful response code
assert response["ResponseMetadata"]["HTTPStatusCode"] == 204

@staticmethod
@contextmanager
def _ephemeral_minio_project_domain_filename_root(s3_client, project, domain):
"""An ephemeral minio S3 path which is wiped upon the context manager's exit"""
# Generate a random path in our Minio s3 bucket, under <BUCKET>/PROJECT/DOMAIN/<UUID>
buckets = s3_client.list_buckets()["Buckets"]
assert len(buckets) == 1 # We expect just the default sandbox bucket
bucket = buckets[0]["Name"]
root = str(uuid.uuid4())
key = f"{PROJECT}/{DOMAIN}/{root}/"
yield ((bucket, key), root)
# Teardown everything under bucket/key
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=key)
if "Contents" in response:
for obj in response["Contents"]:
TestLargeFileTransfers._delete_s3_file(s3_client, bucket, obj["Key"])


@staticmethod
@pytest.mark.parametrize("gigabytes", [2, 3])
def test_flyteremote_uploads_large_file(gigabytes):
"""This test checks whether FlyteRemote can upload large files."""
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
minio_s3_client = TestLargeFileTransfers._get_minio_s3_client(remote)
with ExitStack() as stack:
# Step 1 - Create a large local file
tempdir = stack.enter_context(tempfile.TemporaryDirectory())
file_path = pathlib.Path(tempdir) / "large_file"

with open(file_path, "wb") as f:
# Write in chunks of 500mb to keep memory usage low during tests
for _ in range(gigabytes * 2):
f.write(os.urandom(int(1e9 // 2)))

# Step 2 - Create an ephemeral S3 storage location. This will be wiped
# on context exit to not overload the sandbox's storage
_, ephemeral_filename_root = stack.enter_context(
TestLargeFileTransfers._ephemeral_minio_project_domain_filename_root(
minio_s3_client,
PROJECT,
DOMAIN
)
)

# Step 3 - Upload our large file and check whether the uploaded file's md5 checksum matches our local file's
md5_bytes, upload_location = remote.upload_file(
to_upload=file_path,
project=PROJECT,
domain=DOMAIN,
filename_root=ephemeral_filename_root
)

url = urlparse(upload_location)
bucket, key = url.netloc, url.path.lstrip("/")
s3_md5_bytes = TestLargeFileTransfers._get_s3_file_md5_bytes(minio_s3_client, bucket, key)
assert s3_md5_bytes == md5_bytes
Loading