Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Uncommitted GC #4382

Merged
merged 27 commits into from
Nov 10, 2022
Merged

Proposal: Uncommitted GC #4382

merged 27 commits into from
Nov 10, 2022

Conversation

N-o-Z
Copy link
Member

@N-o-Z N-o-Z commented Oct 18, 2022

Closes #1933

@N-o-Z N-o-Z added proposal exclude-changelog PR description should not be included in next release changelog labels Oct 18, 2022
@N-o-Z N-o-Z self-assigned this Oct 18, 2022
Copy link
Contributor

@itaiad200 itaiad200 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're on to something, great work! See my comments inline.

#### Objects Path Conventions

1. Repository objects will be stored under the prefix `<bucket_name>/repos/<repo_uid>`
2. Branch objects will be stored under the repo prefix with the path `branches/<branch_id>/`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a Branch Object and what is a Repository Object? I think we need terminology section.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrased - let me know if it clarifies it - or that I should add a terminology section


1. Repository objects will be stored under the prefix `<bucket_name>/repos/<repo_uid>`
2. Branch objects will be stored under the repo prefix with the path `branches/<branch_id>/`
3. Each lakeFS instance will create a unique prefix partition (serialized) under the branch path to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will a standalone lakeFS storageNamespace structure look different than a scaled lakeFS? What if the container restarts, should it use the same unique prefix?

Copy link
Member Author

@N-o-Z N-o-Z Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Effectively it will look the same.
Each lakeFS instance will maintain it's own partitions appending a unique suffix to the partition name, in the following manner:
<sortable_descending_serialized_uid>_<lakefs_instance_uid>
Each instance will manage an exclusive partition - on container restart the lakeFS instance will created a new partition according to the above conventions.
The current idea is to track partition size in memory - this means a restarted instance does not have context on the existing partitions, effectively leaving partitions partially allocated. If you think this is not sufficient we might want to think of an additional solution on how to track the partitions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated document

3. Each lakeFS instance will create a unique prefix partition (serialized) under the branch path to
store the branch objects.
The serialized partition prefix will allow partial scans of the bucket when running the optimized GC
4. lakeFS will track the count of objects uploaded to the prefix and create a new one every < TBD > objects uploaded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a hard bound? Might be hard to implement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above this is an upper bound, which might not be fulfilled in the case of the instance restart / shutdown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still requires serializing all uploads.

5. Subtract results `lakeFS DF` from `Branch DF`
6. Filter files newer than < TOKEN_EXPIRY_TIME > and special paths
7. The remainder is a list of files which can be safely removed
8. Finally, save the current run's `GC commit` the last read partition and newest commit id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per branch? Where?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per branch - updated

2. Read addresses from branch's new commits (all commits up to the last GC run commit id) -> `lakeFS DF`
3. Read addresses from branch `GC commit` -> `lakeFS DF`
4. Subtract results `lakeFS DF` from previous run's `GC commit`
5. The result is a list of files that can be safely removed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noice

##### Step 1. Analyze Data and Perform Cleanup for old entries (GC client)

1. Run PrepareUncommittedForGC
2. Read addresses from branch's new commits (all commits up to the last GC run commit id) -> `lakeFS DF`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
2. Read addresses from branch's new commits (all commits up to the last GC run commit id) -> `lakeFS DF`
2. Read addresses from branch's new commits (all commits from to the last GC run commit id) -> `lakeFS DF`

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since log commits returns commits from newest to last, shouldn't it be "down to"?

>**Note:** This step handles cases of objects that were uncommitted during previous GC run and are now deleted

##### Step 2. Analyze Data and Perform Cleanup for new entries (GC client)
1. Read all objects on branch path up to the previous run's last read partition (can be done in parallel by 'partition') -> `Branch DF`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
1. Read all objects on branch path up to the previous run's last read partition (can be done in parallel by 'partition') -> `Branch DF`
1. Read all objects on branch path up from the previous run's last read partition (can be done in parallel by 'partition') -> `Branch DF`

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually down to the previous run's last read partition, since we are utilizing the adapter's list property to retrieve the entries sorted alphabetically

@itaiad200 itaiad200 requested a review from talSofer October 18, 2022 14:10
@ozkatz
Copy link
Collaborator

ozkatz commented Oct 18, 2022

Great ideas @N-o-Z! This is a great direction to explore.
I'm wondering:

  1. Why are we scoping each "partition" with a branch? Isn't it simpler to simply have all partitions live under the storage namespace directly?
  2. The concept of partitions is really nice! Perhaps we can even start with a simpler implementation that isn't incremental? because partitions are more-or-less bounded in size, you've also solved prior design's limitation with object listing that couldn't be parallelized! do a prefix listing once, then have the executors divvy up these prefixes (with little to no skew since they are all pretty much the same size!)

still O(n) but at least it doesn't have to execute serially..

#### Flow 1: Clean Run

1. Run PrepareUncommittedForGC
2. Read all addresses from branch commits -> `lakeFS DF`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's DF?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data Frame - since eventually this design is designated to be implemented over a Spark job, I found using this terminology the most sensible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using committedDF? It is more self explanatory

3. Each lakeFS instance will create a unique prefix partition (serialized) under the branch path to
store the branch objects.
The serialized partition prefix will allow partial scans of the bucket when running the optimized GC
4. lakeFS will track the count of objects uploaded to the prefix and create a new one every < TBD > objects uploaded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a new what? Prefix (i.e. unique prefix?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrased - let me know if it's clearer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. Thanks

@N-o-Z
Copy link
Member Author

N-o-Z commented Oct 18, 2022

Great ideas @N-o-Z! This is a great direction to explore. I'm wondering:

  1. Why are we scoping each "partition" with a branch? Isn't it simpler to simply have all partitions live under the storage namespace directly?
  2. The concept of partitions is really nice! Perhaps we can even start with a simpler implementation that isn't incremental? because partitions are more-or-less bounded in size, you've also solved prior design's limitation with object listing that couldn't be parallelized! do a prefix listing once, then have the executors divvy up these prefixes (with little to no skew since they are all pretty much the same size!)

still O(n) but at least it doesn't have to execute serially..

Thank you @ozkatz

  1. scoping the "partitions" to branches allows us to scope our views of the committed and uncommitted objects to the branch level as well. It in fact allows us to run the GC process on the branch level and enables the concurrency. If we created the partitions on the repository scope (namespace), we would have to go over all committed and uncommitted data for the entire namespace in order to ascertain which objects can be deleted.
  2. We can definitely start with the clean flow, and add the optimized flow later.
    Though we need to take several things into mind:
    1. Even when parallelized, testing S3 listing of a path with ~500,000 objects on a databrick notebook running on the given configuration took around 25 seconds:
      image
      taking for example a repo with around 1 billion objects (not so far fetched as I understand), even if it is perfectly partitioned into bulks of 500,000 objects, the entire listing might take an unreasonable amount of time
    2. While relying only on the clean flow this becomes is a continually growing problem, both the bucket size and commits are unbounded and eventually we will reach an unmanageable size


The following describe the GC process run flows in the branch scope:

#### Flow 1: Clean Run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, it`s a part of our current GC, right? Deleted-uncommited and expired-committed will be deleted in the same run?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it should be incorporated into the same ecosystem. Whether it is the same job, or a different one is to be considered. I'm not sure deciding this now is a requirement for this proposal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@N-o-Z let's make sure we do discuss this detail before settling on a final design. I'm fine with separating the algorithmic discussion from job architecture but there are trade offs we need to consider here.


#### PrepareUncommittedForGC

A new API which will create meta-ranges and ranges using the given branch uncommitted data. These files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is kinda "fake" commit with all uncomitted-undeleted objects, correct?
Pretty cool

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Credit to @itaiad200 😄

repositories
* This solution partitions the branch data into segments that allow parallel listing of branch objects. In cases where
the branch path is extremely large, it might still take a lot of time to perform a clean run.
* For GC to work optimally, it must be executed on a timely basis. Long periods between runs might result in excessively long runs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we improve it by limiting GC cycles (time limit or number of objects to handle)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a bit difficult IMO. Since object creation date is loosely coupled with commit date it might be very complicated to achieve

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an ops issue. I might prefer occasional long runs, or common short runs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrased

Copy link
Contributor

@itaidavid itaidavid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @N-o-Z. If I understood correctly you suggest to treat all uncommitted (undeleted) objects as a retained commit, and delete all objects that are not a part of this or any other reatined commit, correct?
What I mainly miss in this document is a descitption of the approach, before the dive into detail - some story of the solution behind the details. I think it will help understanding the solution and will make it an eaiser read.

@talSofer talSofer requested a review from johnnyaug October 19, 2022 06:56
Copy link
Contributor

@arielshaqed arielshaqed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It looks really exciting, but I didn't entirely understand what "partitions" are.


1. GetPhysicalAddress to return a validation token along with the address.
2. The token will be valid for a specified amount of time and for a single use.
3. LinkPhysicalAddress to verify token valid before creating an entry.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why the race here (between token valid and entry creation) is not a problem: what if the instance verifies a valid token, then the token becomes invalid, and then the entry is created?

In particular, "time interval" is often a race -- particularly in difficult conditions when cluster members have poor time sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, time sync introduces a challenge here. What if GC thinks a token has expired while the server thinks it's valid?


A new API which will create meta-ranges and ranges using the given branch uncommitted data. These files
will be saved in a designated path used by the GC client to list branch's uncommitted objects.
For the purpose of this document we'll call this the `GC commit` (feel free to suggest a better term :) )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a commit, it's "just" a metarange.

Comment on lines 65 to 72
A new API which will create meta-ranges and ranges using the given branch uncommitted data. These files
will be saved in a designated path used by the GC client to list branch's uncommitted objects.
For the purpose of this document we'll call this the `GC commit` (feel free to suggest a better term :) )

#### Reading data from lakeFS

Reading data from lakeFS will be similar to the current GC process - using the SSTable reader to quickly list branch objects.
The above-mentioned API will enable doing so for all branch objects (committed and uncommitted).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the advantages of doing this over directly providing an API to list the branch objects? If the goal is "merely" to get a static listing, could we instead provide an API to push a new staging token and return the previous list of staging tokens? Then the GC Spark job could use another (new) API call list all objects on that list of staging tokens.

Otherwise we end up with a strange unused metarange. 🤷🏽

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metarange is used not only for the current GC run, but used later in the next run as a reference to the current state of the branch and enables an important optimization. Afterwards it can be safely removed.
Also I don't think the sealed tokens provide us with a static state - changes can still occur on the branch which can modify the sealed tokens state.

#### Flow 1: Clean Run

1. Run PrepareUncommittedForGC
2. Read all addresses from branch commits -> `lakeFS DF`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that that works: these are all addresses to keep, they might occur only on another branch (e.g. if copied from another branch by staging them to this one).


## Limitations

* Since this solution relies on the new repository structure, it is not backwards compatible. Therefore, another solution will be required for existing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a slower (one-time) run to cleanup old repositories, once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we won't have any option but to do that :). I'm just not sure this should be a part of this proposal. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a separate proposal is great, just let's have it :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a discovery task:
#4382

##### Step 1. Analyze Data and Perform Cleanup for old entries (GC client)

1. Run PrepareUncommittedForGC
2. Read addresses from branch's new commits (all commits up to the last GC run commit id) -> `lakeFS DF`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we store this "run ID"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added:

  1. Finally, save the current run's GC commit the last read partition and newest commit id in a designated location on the branch path

3. Each lakeFS instance will create a unique prefix partition (serialized) under the branch path to
store the branch objects.
The serialized partition prefix will allow partial scans of the bucket when running the optimized GC
4. lakeFS will track the count of objects uploaded to the prefix and create a new one every < TBD > objects uploaded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still requires serializing all uploads.


1. Repository objects will be stored under the prefix `<storage_namespace>/repos/<repo_uid>/`
2. Branch objects will be stored under the repo prefix with the path `branches/<branch_id>/`
3. Each lakeFS instance will create a unique prefix partition (serialized) under the branch path to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what these partitions are: Staging tokens? Are different instances allowed to scan and use other instances' partitions?

Can you add some motivation for this? It raises some questions.

  • How "ascending" does this need to be? Would a timestamp, which is not really ascending but generally satisfies this, be good enough?
  • How "unique" does this need to be?

#### StageObject

1. Allowed only for address outside the repo namespace
2. Prevent race between staging object and GC job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of race? Is this the token thing?

Copy link
Member Author

@N-o-Z N-o-Z Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By not allowing StageObject on addresses inside the repo namespace, we prevent a race where an un-referenced address is being staged using StageObject while it is being deleted by GC


1. GetPhysicalAddress to return a validation token along with the address.
2. The token will be valid for a specified amount of time and for a single use.
3. LinkPhysicalAddress to verify token valid before creating an entry.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, time sync introduces a challenge here. What if GC thinks a token has expired while the server thinks it's valid?


#### Objects Path Conventions

1. Repository objects will be stored under the prefix `<storage_namespace>/repos/<repo_uid>/`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that today, the storage namespace is tied to a repository - and not the installation.
So partitioning according to repository is irrelevant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right - but we need to take into consideration scenarios where a repository is deleted and created again with the same name. Although it has the same name, the unique identifier is different and we need to take that into consideration.
Perhaps you have a better suggestion on how to do this?

Copy link
Contributor

@johnnyaug johnnyaug Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to cleaning up the data of deleted repositories? If so, I think we can relax this requirement.

I don't necessarily object to a "global storage namespace": I think it's a great idea. It's just that we may want to avoid it because it's too big at this point. If we choose to go this way, it should be mentioned explicitly in this doc (and have a design of its own when the time comes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even today, you can't* create a repository in a storage namespace that serves as a storage namespace for some other repo. So it doesn't matter if the repo is deleted, created by an installation that is no longer active, etc, you can't recreate the repo in the same storage namespace unless it was cleaned.

  • Repo creation checks for the dummy object in the root of the namespace. You can argue that it's not safe enough, but I think we can rely on it for the time being.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand what we are trying to mitigate here.

#### Objects Path Conventions

1. Repository objects will be stored under the prefix `<storage_namespace>/repos/<repo_uid>/`
2. For each repository branch, objects will be stored under the repo prefix with the path `branches/<branch_id>/`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think partitioning according to branch is a brilliant idea, but I'm wondering whether it's premature optimization. It does introduce some complexity and I'm not sure we have evidence that it will benefit us in the real world.

Copy link
Member Author

@N-o-Z N-o-Z Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to scope the GC process only to the commits and staging area that are related to the branch. Otherwise, we will have to read all the information from all commits and all staging areas to understand what we need to delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand if this is required for correctness or just an optimization.

1. Run PrepareUncommittedForGC
2. Read all addresses from branch commits -> `lakeFS DF`
3. Read all addresses from branch `GC commit` -> `lakeFS DF`
4. Read all objects on branch path directly from object store (can be done in parallel by 'partition') -> `Branch DF`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a listing on the object store? In what sense is it parallel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the objects on a given branch a divided into partition prefixes we can use workers to list objects by partition and aggregate the result

lakeFS instance to store the branch's objects. This prefix will be composed of two parts:
1. Lexicographically sortable, descending time based serialization
2. A unique identifier for the lakeFS instance
`<sortable_descending_serialized_uid>_<lakefs_instance_uid>`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the instance ID a part of the partition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each instance writes only to its own partition under the branch prefix. This allows the lakeFS instance to track the amount of files uploaded to this partition and decide when to create a new partition

Copy link
Contributor

@nopcoder nopcoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments in the body - think this is the way, just need to extend the description to how we handle delete of uncommitted data also above the branch level. (branch delete and repository delete).


## Design

### Required changes by lakeFS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest explaining the idea before the required changes to implement it. Or at least per section explain the reasoning on why we suggest the change so the implementor can understand the goal and not the how.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I'm missing context to understand the suggested changes. It will add clarity if we explain the overall flow at a high level, possibly with a diagram, before diving into the separation between lakeFS server changes and Spark client changes.
Also, it will be useful to describe the idea behind the solution - e.g. the input for the algorithm is committed, uncommitted, objects ever written to lakefs, the algorithm aims to find objects that are in 3 and not in 1 or 2.


#### StageObject

1. Allowed only for address outside the repo namespace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to explain the constraint and/or if we are going to remove/change any current functionality or just enforce existing one.

#### StageObject

1. Allowed only for address outside the repo namespace
2. Prevent race between staging object and GC job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this item a requirement from lakefs? or just explaining that the result of the previous requirement.

2. Objects that were uploaded to a physical address issued by the API and were not linked before the token expired will
eventually be deleted by the GC job.

#### CopyObject
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain which copy object do you address - is it the option to do so using the S3 gateway? a new OpenAPI that we need to add to address the fact that we can't stage object from staging?
In case this is S3 only - we need to explain how we do the same using the OpenAPI


## Motivation

Uncommitted data which is no longer referenced (due to branch deletion, reset branch etc.) is not being deleted by lakeFS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Uncommitted data which is no longer referenced (due to branch deletion, reset branch etc.) is not being deleted by lakeFS.
Uncommitted data which is no longer referenced (due to any staged data deletion or override, reset branch etc.) is not being deleted by lakeFS.

Comment on lines 49 to 69
1. GetPhysicalAddress to return a validation token along with the address.
2. The token will be valid for a specified amount of time and for a single use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving item two to be first - as it will require active tracking using storage, when we have storage we can also enforce any validation while using the link without embedding information on the path itself.

#### PrepareUncommittedForGC

A new API which will create meta-ranges and ranges using the given branch uncommitted data. These files
will be saved in a designated path used by the GC client to list branch's uncommitted objects.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Prefer to have a new folder for each request in order to have history, support parallel request, prevent data delete or keeping old files around. And/or we should keep a state of the current request, progress, completion and errors.
  • A risk here will be that a single instance is responsible to extract all the staging information. The request can take a lot of time, it is handled by single instance that can go down and we will need to start from scratch.
  • From the work Yoni did, we should consider write this data in a parquet format.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead make this a paging call by passing the client some continuation token? I know that it will be slower for the client to read, but reducing load on the server may be more important. @lynnro314 has already done some research for how to speed up prepareGCCommits, I am not sure we want to open a new front with an API that may require similar work almost immediately.


#### PrepareUncommittedForGC

A new API which will create meta-ranges and ranges using the given branch uncommitted data. These files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double-checking because the design talked about branch level - this is one set of files per repository?

4. The remainder is a list of files which can be safely removed
5. Finally, save the current run's `GC commit` the last read partition and newest commit id in a designated location on the branch path

## Limitations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the solution should include - how we delete uncommitted data at the 3 levels:

  1. delete repository
  2. delete branch
  3. delete object

#### Objects Path Conventions

1. Repository objects will be stored under the prefix `<storage_namespace>/repos/<repo_uid>/`
2. For each repository branch, objects will be stored under the repo prefix with the path `branches/<branch_id>/`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should address create/delete/create of the same branch by using unique id also in the branch level.

@N-o-Z
Copy link
Member Author

N-o-Z commented Oct 19, 2022

Thank you everyone for the most invaluable input! 🙏🏽
We've decided to try and modify this proposal to remove the use of branch prefixes and use a time based partition only.
Please hold off any new comments until I'm able to address the current ones and modify to proposal.
I'll update once I've made all the required changes - Thanks!

Copy link
Contributor

@talSofer talSofer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@N-o-Z thanks for this great proposal!
I'm posting my comments since they are unrelated to the changes you are planning to the document.


## Design

### Required changes by lakeFS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I'm missing context to understand the suggested changes. It will add clarity if we explain the overall flow at a high level, possibly with a diagram, before diving into the separation between lakeFS server changes and Spark client changes.
Also, it will be useful to describe the idea behind the solution - e.g. the input for the algorithm is committed, uncommitted, objects ever written to lakefs, the algorithm aims to find objects that are in 3 and not in 1 or 2.

#### Flow 1: Clean Run

1. Run PrepareUncommittedForGC
2. Read all addresses from branch commits -> `lakeFS DF`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using committedDF? It is more self explanatory


The following describe the GC process run flows in the branch scope:

#### Flow 1: Clean Run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@N-o-Z let's make sure we do discuss this detail before settling on a final design. I'm fine with separating the algorithmic discussion from job architecture but there are trade offs we need to consider here.


The following describe the GC process run flows in the branch scope:

#### Flow 1: Clean Run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here to add clarity I suggest adding the intent behind each step, i.e.

  1. mark uncommitted data
  2. get all committed addresses
  3. get all uncommitted addresses
  4. get all objects ever written to lakeFS on that path...
  5. subtract committed from all objects on the branch
    etc...

##### Step 1. Analyze Data and Perform Cleanup for old entries (GC client)

1. Run PrepareUncommittedForGC
2. Read addresses from branch's new commits (all new commits down to the last GC run commit id) -> `lakeFS DF`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do that, you may want to use the run ID concept the existing GC has. See https://github.com/treeverse/cloud-controlplane/blob/main/design/accepted/gc-with-run-id.md in this context. This is our plan for implementing incremental GC, and I suggest that we consider using the same concept.

N-o-Z and others added 3 commits October 23, 2022 13:09
@N-o-Z N-o-Z added team/versioning-engine Team versioning engine GC+ labels Oct 25, 2022
since the last GC run.

## Performance Requirements
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing the actual requirement below. It is sort of an estimation of a single task of the uncommitted GC. How long will the entire GC run? What's the repo status (like number of commits, branches, objects to delete, etc.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

The GC process is composed of 3 main parts:
1. Listing namespace objects
2. Listing of lakeFS repository committed objects
3. Listing of lakeFS repository uncommitted objects
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An object that's in case 3 means that there's an active branch that holds the uncommitted object, or is it any uncommitted object under that repository ("dangling" or not)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An active branch that holds the uncommitted object.
We read the committed + uncommitted data from lakeFS via metaranges, so basically it's a static view of the repository (from lakeFS's point of view) in a specific point in time

Performing tests against an AWS S3 bucket using a Databricks notebook on a m4.large cluster, we've observed listing of
~500,000 objects takes approximately 25 seconds.
We can estimate that on a repository with ~1B objects, using ~500K object size slices, and using 10 workers - listing will take
around 1.5 hours.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle failures in such a long run - say, a crash after an hour of execution?
Does the entire process need to be rerun?
Maybe we can parallelize the GC itself, instead of just the listing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GC job will be parallelized, but we can't rely on failed runs. To get a correct picture of the namespace the process must run successfully and without errors.
Relying on partial data, and failed jobs will increase risk of deleting committed data - which is something that absolutely must not happen

Copy link
Contributor

@itaidavid itaidavid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @N-o-Z
Added a question regarding the impact of an error in such a long process, but other than that LGTM.


For each GC run, save the following information using the GC run id as detailed in this [proposal](https://github.com/treeverse/cloud-controlplane/blob/main/design/accepted/gc-with-run-id.md):
1. Save metaranges in `_lakefs/gc/run_id/metadata/`
2. Save `Uncommitted DF` in `_lakefs/gc/run_id/uncommitted.parquet`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong @treeverse/ecosystem, but parquet outputs are multiple files (per column) and it's normally saved under a prefix of it's own for better separation, right?
So a better path would be _lakefs/gc/run_id/uncommitted/entries.parquet

@N-o-Z
Copy link
Member Author

N-o-Z commented Oct 26, 2022

Moved proposal to "accepted" folder.
Please take the time to give final notes - before I merge this PR

#### [Get/Link]PhysicalAddress

1. GetPhysicalAddress to return a validation token along with the address (or embedded as part of the address).
2. The token will be valid for a specified amount of time and for a single use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it matter that the token is valid for a single use?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: #4438

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds an operational requirement for time synchronization between lakeFS instances that belong to the same cluster. I am afraid of adding these because they can be hard to achieve.

As just one example, consider a physical machine in an on-prem cluster that has just booted after a long while down: if its clock is ahead of real time, NTP will take a while to slip it back to the correct value, and in the meantime lakeFS will be up and happily destroying our assumptions :-(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also on AWS awslabs/amazon-eks-ami#249 is a recurring issue in AMIs, and a linked issue has someone with a 7 minute time skew on EKS.

Clocks should be synced, but I think we will exact too strong a penalty for failure here. We should add:

  1. Some safety mechanism.

    For instance:

    • If when we scan we discover prefixes from the distant future (>1 minute?) then error out and mark some metric for alerting.
    • Create an additional marker object in every partition once an hour, and scan what the other cluster members put in theirs. This gives an estimate of time skew, which can be monitored.
  2. Operational guidance (strongly worded documentation).

Copy link
Contributor

@itaiad200 itaiad200 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a bug :(

Comment on lines 90 to 94
#### PrepareUncommittedForGC

A new API which will create meta-ranges and ranges for a given branch using its uncommitted data. These files
will be saved in a designated path used by the GC client to list branch's uncommitted objects.
For the purpose of this document we'll call this the `BranchUncommittedMetarange`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a bug.
I start preparing the uncommitted objects. During that time a Copy is happening on the same branch, making it a metadata operation. The copy copies from the path zz to the path aa. The preparation of the uncommitted started before aa was created, but by the time it reaches zz it was already deleted. Now I don't see neither of them in the output file and the physical object may be deleted by the GC.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Addressed in proposal

Copy link
Contributor

@itaiad200 itaiad200 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the comments, I added a few more but wish not to further block.

1. Copy object in the same branch will work the same - creating a new staging entry using the existing entry information.
2. For objects that are not part of the branch, use the underlying adapter copy operation.
When performing a shallow copy - track copied objects in ref-store.
GC will read the copied objects information from the ref-store, and will add them to the list of uncommitted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timing here is critical - it MUST read the new table after it iterated on the staging area of all branches.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained in PrepareUncommittedForGC - added additional note to emphasize


#### Move/RenameObject
Clients working through the S3 Gateway can use the CopyObject + DeleteObject to perform a Rename or Move operation.
For clients using the OpenAPI this could have been done using StageObject + DeleteObject.
To continue support of this operation, introduce a new API to rename an object which will be scoped to a single branch.
Rename will add copied objects in ref-store similarly to CopyObject
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it needs to maintain that new table too, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

#### Move/RenameObject
Clients working through the S3 Gateway can use the CopyObject + DeleteObject to perform a Rename or Move operation.
For clients using the OpenAPI this could have been done using StageObject + DeleteObject.
To continue support of this operation, introduce a new API to rename an object which will be scoped to a single branch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@N-o-Z Still relevant - If we have a way to logically do a rename with copy and delete, why do we need a non-atomic rename in the API?

Copy link
Contributor

@arielshaqed arielshaqed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Really trying to avoid the namespace reorg, it sounds expensive :-(


The heaviest operation during the GC process, is the namespace listing. And while we added the above optimizations to mitigate
this process, the fact remains - we still need to scan the entire namespace (in the Clean Run mode).
Performing tests against an AWS S3 bucket using a Databricks notebook on a m4.large cluster, we've observed listing of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a larger instance, to see if we are blocked on network or on something else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with an xlarge cluster and max workers 10.
Did some more tests and created a udf which reads a specific partition and returns the objects information.
For 1,000,000 objects divided equally into 100 partitions I was able to load the information into a DF within 30 seconds.
The results looks pretty promising - and I'm sure we can also improve on that.
I will update the proposal with final results this week


### 3. Listing of lakeFS repository uncommitted objects

Expose a new API in lakeFS which writes repository uncommitted objects information into a parquet file in a dedicated path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest not fixing a format for now: it is far from clear which format is best to use, but luckily this looks like an implementation detail that is easy to change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is an implementation detail and will remove the format specific details from the proposal

#### [Get/Link]PhysicalAddress

1. GetPhysicalAddress to return a validation token along with the address (or embedded as part of the address).
2. The token will be valid for a specified amount of time and for a single use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds an operational requirement for time synchronization between lakeFS instances that belong to the same cluster. I am afraid of adding these because they can be hard to achieve.

As just one example, consider a physical machine in an on-prem cluster that has just booted after a long while down: if its clock is ahead of real time, NTP will take a while to slip it back to the correct value, and in the meantime lakeFS will be up and happily destroying our assumptions :-(

lakeFS will track copy operations of uncommitted objects and store them in the ref-store for a limited duration.
GC will use this information as part of the uncommitted data to avoid a race between the GC job and rename operation.
lakeFS will periodically scan these entries and remove copy entries from the ref-store after such time that will
allow correct execution of the GC process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that I understand. How does lakeFS know that GC actually ran?

#### Track copied objects in ref-store

lakeFS will track copy operations of uncommitted objects and store them in the ref-store for a limited duration.
GC will use this information as part of the uncommitted data to avoid a race between the GC job and rename operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy operation? Not sure lakeFS can support atomic rename.

Copy link
Member Author

@N-o-Z N-o-Z Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are tracking copy operations but the purpose is to avoid a race in rename.
Consider the following:

  1. GC starts reading branch staging area
  2. Rename operation create a copy of staged entry - which doesn't not get listed due to listing order
  3. Rename operation deletes original entry before GC is able to scan it as part of the listing of staging area

As a result, the physical address will not be listed in uncommitted (or committed) and will be candidate for deletion.
Reading from the copy table - allows exempting copied objects from the delete list and avoid this race.

The copied entries lifespan can be minutes-hour or can be removed as part of a branch operation (commit/reset/delete branch)

In the next GC iteration - these addresses will either be in committed, still in staging or deleted and will be handled accordingly


When performing a shallow copy - track copied objects in ref-store.
GC will read the copied objects information from the ref-store, and will add them to the list of uncommitted.
lakeFS will periodically clear the copied list according to timestamp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long is this? The shorter it is, the greater the requirement for time sync between lakeFS instances. So it seems like it needs to be fairly large.

Copy link
Contributor

@arielshaqed arielshaqed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat!

I'm only worried about the operational aspect of maintaining synchronized times across the cluster of lakeFS servers. But I believe that we can observe the time skew automatically, even as part of time-based partitioning!

Given that we're talking about deleting files, we should be very sure that we help operators control time sync. Ideally fail-shut and refuse to GC uncommitted or even work at all when time sync is not good enough. (I'd talking about minutes-level time sync here, of course. NTP gives you milliseconds-level time sync. But virtualization is notorious for screwing up time sync occasionally.)

#### [Get/Link]PhysicalAddress

1. GetPhysicalAddress to return a validation token along with the address (or embedded as part of the address).
2. The token will be valid for a specified amount of time and for a single use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also on AWS awslabs/amazon-eks-ami#249 is a recurring issue in AMIs, and a linked issue has someone with a 7 minute time skew on EKS.

Clocks should be synced, but I think we will exact too strong a penalty for failure here. We should add:

  1. Some safety mechanism.

    For instance:

    • If when we scan we discover prefixes from the distant future (>1 minute?) then error out and mark some metric for alerting.
    • Create an additional marker object in every partition once an hour, and scan what the other cluster members put in theirs. This gives an estimate of time skew, which can be monitored.
  2. Operational guidance (strongly worded documentation).

@N-o-Z N-o-Z merged commit 0e15957 into master Nov 10, 2022
@N-o-Z N-o-Z deleted the proposal/offline-uncommitted-gc-2 branch November 10, 2022 16:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
exclude-changelog PR description should not be included in next release changelog GC+ proposal team/versioning-engine Team versioning engine
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Hard-delete objects that were never committed
9 participants