-
Notifications
You must be signed in to change notification settings - Fork 130
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Persistence: purge unreferenced
Obj
s
This is an attempt to implement the algorithm mentioned in the PR #9401. The `Obj.referenced()` attribute contains the timestamp when the object was last "referenced" (aka: attempted to be written). It is ... * set when an object is first persisted via a `storeObj()` * updated in the database, when an object was not persisted via `storeObj()` * set/updated via `upsertObj()` * updated via `updateConditional()` Let's assume that there is a mechanism to identify the IDs of all referenced objects (it would be very similar to what the export functionality does). The algorithm to purge unreferenced objects must never delete an object that is referenced at any point of time, and must consider the case that an object that was unreferenced when a purge-unreferenced-objects routine started, but became referenced while it is running. An approach could work as follows: 1. Memoize the current timestamp (minus some wall-clock drift adjustment). 2. Identify the IDs of all referenced objects. We could leverage a bloom filter, if the set of IDs is big. 3. Then scan all objects in the repository. Objects can be purged, if ... * the ID is not in the set (or bloom filter) generated in step 2 ... * _AND_ have a `referenced` timestamp less than the memoized timestamp. Any deletion in the backing database would follow the meaning of this pseudo SQL: `DELETE FROM objs WHERE obj_id = :objId AND referenced < :memoizedTimestamp`. Noting, that the `referenced` attribute is rather incorrect when retrieved from the objects cache (aka: during normal operations), which is not a problem, because that `referenced` attribute is irrelevant for production accesses. There are two edge cases / race conditions: * (for some backends): A `storeObj()` operation detected that the object already exists - then the purge routine deletes that object - and then the `storeObj()` tries to upddate the `referenced` attribute. The result is the loss of that object. This race condition can only occur, if the object existed but was not referenced. * While the referenced objects are being identified, create a new named reference (branch / tag) pointing to commit(s) that would be identified as unreferenced and being later purged.
- Loading branch information
Showing
14 changed files
with
681 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright (C) 2022 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
plugins { id("nessie-conventions-server") } | ||
|
||
publishingHelper { mavenName = "Nessie - Storage - Garbage Collect" } | ||
|
||
description = "Identify and purge unreferenced objects." | ||
|
||
dependencies { | ||
implementation(project(":nessie-model")) | ||
implementation(project(":nessie-versioned-storage-common")) | ||
implementation(project(":nessie-versioned-spi")) | ||
implementation(project(":nessie-versioned-transfer-related")) | ||
|
||
compileOnly(libs.jakarta.validation.api) | ||
compileOnly(libs.jakarta.annotation.api) | ||
compileOnly(libs.microprofile.openapi) | ||
|
||
compileOnly(platform(libs.jackson.bom)) | ||
compileOnly("com.fasterxml.jackson.core:jackson-annotations") | ||
|
||
compileOnly(libs.errorprone.annotations) | ||
implementation(libs.guava) | ||
|
||
compileOnly(project(":nessie-versioned-storage-testextension")) | ||
|
||
compileOnly(libs.immutables.builder) | ||
compileOnly(libs.immutables.value.annotations) | ||
annotationProcessor(libs.immutables.value.processor) | ||
|
||
testImplementation(project(":nessie-versioned-storage-common-tests")) | ||
testImplementation(project(":nessie-versioned-storage-inmemory")) | ||
testImplementation(project(path = ":nessie-protobuf-relocated", configuration = "shadow")) | ||
testImplementation(platform(libs.junit.bom)) | ||
testImplementation(libs.bundles.junit.testing) | ||
testRuntimeOnly(libs.logback.classic) | ||
} |
23 changes: 23 additions & 0 deletions
23
...src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolver.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright (C) 2024 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.versioned.storage.garbagecollect; | ||
|
||
import jakarta.validation.constraints.NotNull; | ||
|
||
public interface LiveObjectsResolver extends AutoCloseable { | ||
void resolve(@NotNull ObjectsResolverContext objectsResolverContext) | ||
throws MustRestartWithBiggerFilterException; | ||
} |
143 changes: 143 additions & 0 deletions
143
...main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolverImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
* Copyright (C) 2024 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.versioned.storage.garbagecollect; | ||
|
||
import static com.google.common.base.Preconditions.checkState; | ||
import static org.projectnessie.versioned.storage.common.logic.CommitLogQuery.commitLogQuery; | ||
import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; | ||
import static org.projectnessie.versioned.storage.common.logic.Logics.indexesLogic; | ||
import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; | ||
import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; | ||
import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery; | ||
|
||
import org.projectnessie.model.Content; | ||
import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException; | ||
import org.projectnessie.versioned.storage.common.indexes.StoreIndex; | ||
import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; | ||
import org.projectnessie.versioned.storage.common.logic.IndexesLogic; | ||
import org.projectnessie.versioned.storage.common.logic.ReferenceLogic; | ||
import org.projectnessie.versioned.storage.common.objtypes.CommitObj; | ||
import org.projectnessie.versioned.storage.common.objtypes.CommitOp; | ||
import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj; | ||
import org.projectnessie.versioned.storage.common.objtypes.StandardObjType; | ||
import org.projectnessie.versioned.storage.common.persist.Obj; | ||
import org.projectnessie.versioned.storage.common.persist.ObjId; | ||
import org.projectnessie.versioned.storage.common.persist.ObjType; | ||
import org.projectnessie.versioned.storage.common.persist.Persist; | ||
import org.projectnessie.versioned.storage.common.persist.Reference; | ||
import org.projectnessie.versioned.store.DefaultStoreWorker; | ||
|
||
final class LiveObjectsResolverImpl implements LiveObjectsResolver { | ||
@Override | ||
public void resolve(ObjectsResolverContext objectsResolverContext) | ||
throws MustRestartWithBiggerFilterException { | ||
Persist persist = objectsResolverContext.persist(); | ||
checkState( | ||
repositoryLogic(persist).repositoryExists(), | ||
"The provided repository has not been initialized."); | ||
|
||
objectsResolverContext | ||
.relatedObjects() | ||
.repositoryRelatedObjects() | ||
.forEach( | ||
id -> { | ||
handleObj(id, objectsResolverContext); | ||
}); | ||
|
||
ReferenceLogic referenceLogic = referenceLogic(persist); | ||
referenceLogic | ||
.queryReferences(referencesQuery()) | ||
.forEachRemaining( | ||
reference -> { | ||
walkReference(reference, objectsResolverContext); | ||
}); | ||
} | ||
|
||
private void walkReference(Reference reference, ObjectsResolverContext objectsResolverContext) { | ||
objectsResolverContext | ||
.relatedObjects() | ||
.referenceRelatedObjects(reference) | ||
.forEach(id -> handleObj(id, objectsResolverContext)); | ||
|
||
commitLogic(objectsResolverContext.persist()) | ||
.commitLog(commitLogQuery(reference.pointer())) | ||
.forEachRemaining(commit -> walkCommit(commit, objectsResolverContext)); | ||
|
||
ObjId extendedInfo = reference.extendedInfoObj(); | ||
if (extendedInfo != null) { | ||
objectsResolverContext.filter().markReferenced(extendedInfo); | ||
} | ||
} | ||
|
||
private void walkCommit(CommitObj commit, ObjectsResolverContext objectsResolverContext) { | ||
objectsResolverContext.filter().markReferenced(commit.id()); | ||
|
||
objectsResolverContext | ||
.relatedObjects() | ||
.commitRelatedObjects(commit) | ||
.forEach(id -> handleObj(id, objectsResolverContext)); | ||
|
||
IndexesLogic indexesLogic = indexesLogic(objectsResolverContext.persist()); | ||
StoreIndex<CommitOp> index = indexesLogic.buildCompleteIndexOrEmpty(commit); | ||
for (StoreIndexElement<CommitOp> indexElement : index) { | ||
CommitOp content = indexElement.content(); | ||
if (content.action().exists()) { | ||
ObjId value = content.value(); | ||
handleObj(value, objectsResolverContext); | ||
} | ||
} | ||
} | ||
|
||
private void handleObj(ObjId value, ObjectsResolverContext objectsResolverContext) { | ||
// TODO fetch 'value's in batches | ||
|
||
Obj obj; | ||
try { | ||
obj = objectsResolverContext.persist().fetchObj(value); | ||
} catch (ObjNotFoundException e) { | ||
// ignore not-found situation - nothing what we could do here | ||
return; | ||
} | ||
|
||
objectsResolverContext.filter().markReferenced(value); | ||
|
||
ObjType type = obj.type(); | ||
|
||
if (StandardObjType.VALUE.equals(type)) { | ||
ContentValueObj contentValueObj = (ContentValueObj) obj; | ||
Content content = | ||
DefaultStoreWorker.instance() | ||
.valueFromStore(contentValueObj.payload(), contentValueObj.data()); | ||
|
||
handleContent(content, objectsResolverContext); | ||
} | ||
} | ||
|
||
private void handleContent(Content content, ObjectsResolverContext objectsResolverContext) { | ||
objectsResolverContext | ||
.relatedObjects() | ||
.contentRelatedObjects(content) | ||
.forEach( | ||
id -> { | ||
handleObj(id, objectsResolverContext); | ||
}); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
// TODO handle remaining batched 'value's from handleValue(). | ||
} | ||
} |
27 changes: 27 additions & 0 deletions
27
.../projectnessie/versioned/storage/garbagecollect/MustRestartWithBiggerFilterException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright (C) 2024 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.versioned.storage.garbagecollect; | ||
|
||
/** | ||
* Thrown when the bloom filter's FPP is above the configured threshold when adding IDs. If this | ||
* exception is encountered, the current garbage-collection run <em>must</em> be aborted and | ||
* restarted with a bigger {@link ObjectsResolverParams#expectedObjects()} value. | ||
*/ | ||
public class MustRestartWithBiggerFilterException extends RuntimeException { | ||
public MustRestartWithBiggerFilterException(String msg) { | ||
super(msg); | ||
} | ||
} |
31 changes: 31 additions & 0 deletions
31
...collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright (C) 2024 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.versioned.storage.garbagecollect; | ||
|
||
import jakarta.validation.constraints.NotNull; | ||
import org.projectnessie.versioned.storage.common.persist.ObjId; | ||
|
||
public interface ObjIdFilter { | ||
boolean markReferenced(@NotNull ObjId objId) throws MustRestartWithBiggerFilterException; | ||
|
||
boolean isProbablyReferenced(@NotNull ObjId objId); | ||
|
||
boolean withinExpectedFpp(); | ||
|
||
long approximateElementCount(); | ||
|
||
double expectedFpp(); | ||
} |
83 changes: 83 additions & 0 deletions
83
...ect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilterImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright (C) 2024 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.versioned.storage.garbagecollect; | ||
|
||
import com.google.common.hash.BloomFilter; | ||
import com.google.common.hash.PrimitiveSink; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import org.projectnessie.versioned.storage.common.persist.ObjId; | ||
|
||
@SuppressWarnings("UnstableApiUsage") | ||
final class ObjIdFilterImpl implements ObjIdFilter { | ||
private final BloomFilter<ObjId> filter; | ||
private final double allowedFalsePositiveProbability; | ||
private final AtomicLong remainingElements; | ||
|
||
ObjIdFilterImpl(ObjectsResolverParams params) { | ||
this.filter = createBloomFilter(params); | ||
this.remainingElements = new AtomicLong(params.expectedObjects()); | ||
this.allowedFalsePositiveProbability = params.allowedFalsePositiveProbability(); | ||
} | ||
|
||
static BloomFilter<ObjId> createBloomFilter(ObjectsResolverParams params) { | ||
return BloomFilter.create( | ||
ObjIdFilterImpl::funnel, params.expectedObjects(), params.falsePositiveProbability()); | ||
} | ||
|
||
private static void funnel(ObjId id, PrimitiveSink primitiveSink) { | ||
int idSize = id.size(); | ||
int i = 0; | ||
for (; idSize >= 8; idSize -= 8) { | ||
primitiveSink.putLong(id.longAt(i++)); | ||
} | ||
i <<= 3; | ||
for (; idSize > 0; idSize--) { | ||
primitiveSink.putByte(id.byteAt(i++)); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean markReferenced(ObjId objId) throws MustRestartWithBiggerFilterException { | ||
if (filter.put(objId)) { | ||
if (remainingElements.decrementAndGet() >= 0L || withinExpectedFpp()) { | ||
return true; | ||
} | ||
throw new MustRestartWithBiggerFilterException( | ||
"Bloom filter exceeded the configured expected FPP"); | ||
} | ||
return false; | ||
} | ||
|
||
@Override | ||
public boolean isProbablyReferenced(ObjId objId) { | ||
return filter.mightContain(objId); | ||
} | ||
|
||
@Override | ||
public boolean withinExpectedFpp() { | ||
return expectedFpp() <= allowedFalsePositiveProbability; | ||
} | ||
|
||
@Override | ||
public long approximateElementCount() { | ||
return filter.approximateElementCount(); | ||
} | ||
|
||
@Override | ||
public double expectedFpp() { | ||
return filter.expectedFpp(); | ||
} | ||
} |
Oops, something went wrong.