-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Overwrite mode for write parquet/csv #3108
Conversation
CodSpeed Performance ReportMerging #3108 will not alter performanceComparing Summary
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3108 +/- ##
==========================================
- Coverage 78.65% 78.63% -0.02%
==========================================
Files 618 621 +3
Lines 73192 74150 +958
==========================================
+ Hits 57568 58310 +742
- Misses 15624 15840 +216
|
daft/dataframe/dataframe.py
Outdated
@@ -513,6 +514,7 @@ def write_parquet( | |||
self, | |||
root_dir: Union[str, pathlib.Path], | |||
compression: str = "snappy", | |||
write_mode: str = "append", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Union[Literal["append"], Literal["overwrite"]]
?
daft/filesystem.py
Outdated
else: | ||
raise NotImplementedError(f"Cannot infer Fsspec filesystem for protocol {protocol}: please file an issue!") | ||
|
||
return fs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would avoid fsspec if possible here so we can avoid taking a dependency on it.
I'm also not sure if the bulk delete API has any performance benefit over a serial delete call... I guess for the services (e.g. S3) we can parallelize DELETE
requests over the wire.
Ideally we can support DELETE
on our own IO clients, but in the absence of that shall we use PyArrow instead and just naively delete one-by-one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we can use PyArrow, should be easier to implement anyways since we already have the machinery to infer a pyarrow filesystem. I can also run some tests to see if paralellizing the deletes make sense.
@@ -353,3 +358,22 @@ def join_path(fs: pafs.FileSystem, base_path: str, *sub_paths: str) -> str: | |||
return os.path.join(base_path, *sub_paths) | |||
else: | |||
return f"{base_path.rstrip('/')}/{'/'.join(sub_paths)}" | |||
|
|||
|
|||
def overwrite_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like we're supporting partition overwrite mode here. Where we only delete files if we wrote a new file into that partition.
Could we leverage what @desmondcheongzx is working on for hive style reads to be able to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, but we could also just match on the directory paths and delete accordingly. Will make a separate PR for this.
Addresses: #3112 and #1768
Implements overwrite mode for write_parquet and write_csv.
Upon finishing the write, we are left with a manifest of written file paths. We can use this to perform a
delete all files not in manifest
, by:ls
to figure out all the current files in the root dir.is_in
expression to get the file paths to delete.Notes:
ls
andrm
functionalities. This is favored over pyarrow filesystem becauserm
is a bulk delete method, aka we can do the delete in a single API call. Pyarrow filesystem does not have bulk deletes.