Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Overwrite mode for write parquet/csv #3108

Merged
merged 16 commits into from
Nov 6, 2024
Merged

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Oct 23, 2024

Addresses: #3112 and #1768

Implements overwrite mode for write_parquet and write_csv.

Upon finishing the write, we are left with a manifest of written file paths. We can use this to perform a delete all files not in manifest, by:

  1. Do an ls to figure out all the current files in the root dir.
  2. Use daft's built in is_in expression to get the file paths to delete.
  3. Delete them.

Notes:

  • Relies on fsspec for ls and rm functionalities. This is favored over pyarrow filesystem because rm is a bulk delete method, aka we can do the delete in a single API call. Pyarrow filesystem does not have bulk deletes.

@github-actions github-actions bot added the enhancement New feature or request label Oct 23, 2024
Copy link

codspeed-hq bot commented Oct 23, 2024

CodSpeed Performance Report

Merging #3108 will not alter performance

Comparing colin/overwrite-writes (5fdc9d9) with main (5b450fb)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Oct 23, 2024

Codecov Report

Attention: Patch coverage is 87.50000% with 3 lines in your changes missing coverage. Please review.

Project coverage is 78.63%. Comparing base (c69ee3f) to head (5fdc9d9).
Report is 41 commits behind head on main.

Files with missing lines Patch % Lines
daft/dataframe/dataframe.py 77.77% 2 Missing ⚠️
daft/filesystem.py 93.33% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3108      +/-   ##
==========================================
- Coverage   78.65%   78.63%   -0.02%     
==========================================
  Files         618      621       +3     
  Lines       73192    74150     +958     
==========================================
+ Hits        57568    58310     +742     
- Misses      15624    15840     +216     
Files with missing lines Coverage Δ
daft/filesystem.py 70.62% <93.33%> (+1.90%) ⬆️
daft/dataframe/dataframe.py 86.48% <77.77%> (-0.09%) ⬇️

... and 34 files with indirect coverage changes

@colin-ho colin-ho requested review from jaychia and samster25 October 24, 2024 19:58
@@ -513,6 +514,7 @@ def write_parquet(
self,
root_dir: Union[str, pathlib.Path],
compression: str = "snappy",
write_mode: str = "append",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Union[Literal["append"], Literal["overwrite"]]?

else:
raise NotImplementedError(f"Cannot infer Fsspec filesystem for protocol {protocol}: please file an issue!")

return fs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid fsspec if possible here so we can avoid taking a dependency on it.

I'm also not sure if the bulk delete API has any performance benefit over a serial delete call... I guess for the services (e.g. S3) we can parallelize DELETE requests over the wire.

Ideally we can support DELETE on our own IO clients, but in the absence of that shall we use PyArrow instead and just naively delete one-by-one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can use PyArrow, should be easier to implement anyways since we already have the machinery to infer a pyarrow filesystem. I can also run some tests to see if paralellizing the deletes make sense.

@@ -353,3 +358,22 @@ def join_path(fs: pafs.FileSystem, base_path: str, *sub_paths: str) -> str:
return os.path.join(base_path, *sub_paths)
else:
return f"{base_path.rstrip('/')}/{'/'.join(sub_paths)}"


def overwrite_files(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like we're supporting partition overwrite mode here. Where we only delete files if we wrote a new file into that partition.

Could we leverage what @desmondcheongzx is working on for hive style reads to be able to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, but we could also just match on the directory paths and delete accordingly. Will make a separate PR for this.

@colin-ho colin-ho merged commit 0d669ca into main Nov 6, 2024
42 checks passed
@colin-ho colin-ho deleted the colin/overwrite-writes branch November 6, 2024 17:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants