Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split dataframe to subsets #58

Merged
merged 16 commits into from
May 4, 2023
Merged

Conversation

PhilippeMoussalli
Copy link
Contributor

This PR optimizes the process of executing dask transformations and writing by creating delayed tasks that are executed in parallel. This is mainly to optimize the process of writing many different subsets at the same time.

I also added more documentation for the Dataset class. The changes did require some refactoring

@RobbeSneyders RobbeSneyders linked an issue May 2, 2023 that may be closed by this pull request
@RobbeSneyders RobbeSneyders added this to the Alpha milestone May 2, 2023
Comment on lines 227 to 231
# Add the output subset to the manifest
manifest_fields = [
(field.name, Type[field.type.name]) for field in subset.fields.values()
]
self.manifest.add_subset(subset_name, fields=manifest_fields)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Dataset class should be a wrapper around an immutable Manifest (see discussion here).

The evolve method currently takes care of updating the manifest (adding subsets based on the output subsets of the component spec). cc @RobbeSneyders

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I think i'll resolve this when merging with the #56. Maybe let's keep the focus of this PR on the splitting logic with dask

fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
dataset.add_index(df)
dataset.add_subsets(df, self.spec)
index_task = dataset.get_upload_index_task(df)
subset_tasks = dataset.get_upload_subsets_task(df, self.spec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep these tasks within the Dataset class. That way we only need to implement different Dataset classes if we want to support different frameworks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the merge subsets into dataframe into a task as well? So we can include it in the the task list and let dask figure out how to handle it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'v updated it accordingly.Only small downside is that now we're handling both writing of index and all of the subsets separately but that shouldn't introduce a major performance bump

@GeorgesLorre
Copy link
Collaborator

For testing you can use https://docs.pytest.org/en/7.1.x/how-to/tmp_path.html to write out the subsets temporarily.

@RobbeSneyders
Copy link
Member

#56 has been merged. Can you rebase or merge so the conflicts are resolved? That will make it easier to review.

fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
Copy link
Member

@RobbeSneyders RobbeSneyders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @PhilippeMoussalli! General approach looks good to me. Left some comments :)

fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
fondant/dataset.py Outdated Show resolved Hide resolved
dataset.add_index(df)
dataset.add_subsets(df, self.spec)
index_task = dataset.get_upload_index_task(df)
subset_tasks = dataset.get_upload_subsets_task(df, self.spec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the merge subsets into dataframe into a task as well? So we can include it in the the task list and let dask figure out how to handle it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly changed in the parquet files ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the operation you defined returns a lazy dataframe so no need to define a task there.

I changed the source data type from integer to string since that's what we currently expect

Args:
df: The output Dask dataframe returned by the user.
"""
remote_path = self.manifest.index.location
index_columns = list(self.manifest.index.fields.keys())

# load index dataframe
index_df = df[index_columns]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this index_df have an index ? Maybe we need to call .set_index() ?

Copy link
Contributor Author

@PhilippeMoussalli PhilippeMoussalli May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're using both the id and source as index. It does not seem like dask supports multiindex though https://dask.discourse.group/t/everything-about-multiindex-in-dask/593/2.
We might want to reconsider having the index be a string that contains both the source and id

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just index id since it is better then no index (this is also what happens in the test data see split.py). I'll create a ticket to solve multi-index.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright added it to both index and subsets

fondant/dataset.py Show resolved Hide resolved
Copy link
Collaborator

@GeorgesLorre GeorgesLorre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

@PhilippeMoussalli PhilippeMoussalli merged commit 47a6b97 into main May 4, 2023
@RobbeSneyders RobbeSneyders deleted the split-dataframe-to-subsets branch May 15, 2023 16:29
Hakimovich99 pushed a commit that referenced this pull request Oct 16, 2023
This PR optimizes the process of executing dask transformations and
writing by creating delayed tasks that are executed in parallel. This is
mainly to optimize the process of writing many different subsets at the
same time.

I also added more documentation for the `Dataset` class. The changes did
require some refactoring
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Split dataframe to write different subsets
4 participants