From 59cec883f8e3af49b151e052544fe4e16ca4b1f9 Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Wed, 2 Oct 2024 14:10:59 +0200 Subject: [PATCH] Persistence: purge unreferenced `Obj`s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is an attempt to implement the algorithm mentioned in the PR #9401. The `Obj.referenced()` attribute contains the timestamp when the object was last "referenced" (aka: attempted to be written). It is ... * set when an object is first persisted via a `storeObj()` * updated in the database, when an object was not persisted via `storeObj()` * set/updated via `upsertObj()` * updated via `updateConditional()` Let's assume that there is a mechanism to identify the IDs of all referenced objects (it would be very similar to what the export functionality does). The algorithm to purge unreferenced objects must never delete an object that is referenced at any point of time, and must consider the case that an object that was unreferenced when a purge-unreferenced-objects routine started, but became referenced while it is running. An approach could work as follows: 1. Memoize the current timestamp (minus some wall-clock drift adjustment). 2. Identify the IDs of all referenced objects. We could leverage a bloom filter, if the set of IDs is big. 3. Then scan all objects in the repository. Objects can be purged, if ...     * the ID is not in the set (or bloom filter) generated in step 2 ...     * _AND_ have a `referenced` timestamp less than the memoized timestamp. Any deletion in the backing database would follow the meaning of this pseudo SQL: `DELETE FROM objs WHERE obj_id = :objId AND referenced < :memoizedTimestamp`. Noting, that the `referenced` attribute is rather incorrect when retrieved from the objects cache (aka: during normal operations), which is not a problem, because that `referenced` attribute is irrelevant for production accesses. There are two edge cases / race conditions: * (for some backends): A `storeObj()` operation detected that the object already exists - then the purge routine deletes that object - and then the `storeObj()` tries to upddate the `referenced` attribute. The result is the loss of that object. This race condition can only occur, if the object existed but was not referenced. * While the referenced objects are being identified, create a new named reference (branch / tag) pointing to commit(s) that would be identified as unreferenced and being later purged. --- bom/build.gradle.kts | 1 + gradle/projects.main.properties | 1 + .../storage/garbage-collect/build.gradle.kts | 51 +++++++ .../garbagecollect/LiveObjectsResolver.java | 23 +++ .../LiveObjectsResolverImpl.java | 143 ++++++++++++++++++ .../MustRestartWithBiggerFilterException.java | 27 ++++ .../storage/garbagecollect/ObjIdFilter.java | 31 ++++ .../garbagecollect/ObjIdFilterImpl.java | 83 ++++++++++ .../ObjectsResolverContext.java | 35 +++++ .../ObjectsResolverContextImpl.java | 54 +++++++ .../garbagecollect/ObjectsResolverParams.java | 51 +++++++ .../storage/garbagecollect/PurgeObjects.java | 22 +++ .../garbagecollect/PurgeObjectsImpl.java | 55 +++++++ .../garbagecollect/TestObjIdFilterImpl.java | 104 +++++++++++++ 14 files changed, 681 insertions(+) create mode 100644 versioned/storage/garbage-collect/build.gradle.kts create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolver.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolverImpl.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/MustRestartWithBiggerFilterException.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilter.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilterImpl.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverContext.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverContextImpl.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverParams.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/PurgeObjects.java create mode 100644 versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/PurgeObjectsImpl.java create mode 100644 versioned/storage/garbage-collect/src/test/java/org/projectnessie/versioned/storage/garbagecollect/TestObjIdFilterImpl.java diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 7560a54b459..ac0fa0b3a76 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -96,6 +96,7 @@ dependencies { api(project(":nessie-versioned-storage-dynamodb-tests")) api(project(":nessie-versioned-storage-dynamodb2")) api(project(":nessie-versioned-storage-dynamodb2-tests")) + api(project(":nessie-versioned-storage-garbage-collect")) api(project(":nessie-versioned-storage-inmemory")) api(project(":nessie-versioned-storage-inmemory-tests")) api(project(":nessie-versioned-storage-jdbc")) diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 558a5a261b1..f6f332e8da2 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -79,6 +79,7 @@ nessie-versioned-storage-dynamodb=versioned/storage/dynamodb nessie-versioned-storage-dynamodb-tests=versioned/storage/dynamodb-tests nessie-versioned-storage-dynamodb2=versioned/storage/dynamodb2 nessie-versioned-storage-dynamodb2-tests=versioned/storage/dynamodb2-tests +nessie-versioned-storage-garbage-collect=versioned/storage/garbage-collect nessie-versioned-storage-inmemory=versioned/storage/inmemory nessie-versioned-storage-inmemory-tests=versioned/storage/inmemory-tests nessie-versioned-storage-jdbc=versioned/storage/jdbc diff --git a/versioned/storage/garbage-collect/build.gradle.kts b/versioned/storage/garbage-collect/build.gradle.kts new file mode 100644 index 00000000000..25f68c658ad --- /dev/null +++ b/versioned/storage/garbage-collect/build.gradle.kts @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id("nessie-conventions-server") } + +publishingHelper { mavenName = "Nessie - Storage - Garbage Collect" } + +description = "Identify and purge unreferenced objects." + +dependencies { + implementation(project(":nessie-model")) + implementation(project(":nessie-versioned-storage-common")) + implementation(project(":nessie-versioned-spi")) + implementation(project(":nessie-versioned-transfer-related")) + + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.microprofile.openapi) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + + compileOnly(libs.errorprone.annotations) + implementation(libs.guava) + + compileOnly(project(":nessie-versioned-storage-testextension")) + + compileOnly(libs.immutables.builder) + compileOnly(libs.immutables.value.annotations) + annotationProcessor(libs.immutables.value.processor) + + testImplementation(project(":nessie-versioned-storage-common-tests")) + testImplementation(project(":nessie-versioned-storage-inmemory")) + testImplementation(project(path = ":nessie-protobuf-relocated", configuration = "shadow")) + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.bundles.junit.testing) + testRuntimeOnly(libs.logback.classic) +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolver.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolver.java new file mode 100644 index 00000000000..d523f34bb8e --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolver.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import jakarta.validation.constraints.NotNull; + +public interface LiveObjectsResolver extends AutoCloseable { + void resolve(@NotNull ObjectsResolverContext objectsResolverContext) + throws MustRestartWithBiggerFilterException; +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolverImpl.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolverImpl.java new file mode 100644 index 00000000000..578ed316cda --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/LiveObjectsResolverImpl.java @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import static com.google.common.base.Preconditions.checkState; +import static org.projectnessie.versioned.storage.common.logic.CommitLogQuery.commitLogQuery; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.indexesLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery; + +import org.projectnessie.model.Content; +import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException; +import org.projectnessie.versioned.storage.common.indexes.StoreIndex; +import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; +import org.projectnessie.versioned.storage.common.logic.IndexesLogic; +import org.projectnessie.versioned.storage.common.logic.ReferenceLogic; +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.objtypes.CommitOp; +import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj; +import org.projectnessie.versioned.storage.common.objtypes.StandardObjType; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.ObjType; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; +import org.projectnessie.versioned.store.DefaultStoreWorker; + +final class LiveObjectsResolverImpl implements LiveObjectsResolver { + @Override + public void resolve(ObjectsResolverContext objectsResolverContext) + throws MustRestartWithBiggerFilterException { + Persist persist = objectsResolverContext.persist(); + checkState( + repositoryLogic(persist).repositoryExists(), + "The provided repository has not been initialized."); + + objectsResolverContext + .relatedObjects() + .repositoryRelatedObjects() + .forEach( + id -> { + handleObj(id, objectsResolverContext); + }); + + ReferenceLogic referenceLogic = referenceLogic(persist); + referenceLogic + .queryReferences(referencesQuery()) + .forEachRemaining( + reference -> { + walkReference(reference, objectsResolverContext); + }); + } + + private void walkReference(Reference reference, ObjectsResolverContext objectsResolverContext) { + objectsResolverContext + .relatedObjects() + .referenceRelatedObjects(reference) + .forEach(id -> handleObj(id, objectsResolverContext)); + + commitLogic(objectsResolverContext.persist()) + .commitLog(commitLogQuery(reference.pointer())) + .forEachRemaining(commit -> walkCommit(commit, objectsResolverContext)); + + ObjId extendedInfo = reference.extendedInfoObj(); + if (extendedInfo != null) { + objectsResolverContext.filter().markReferenced(extendedInfo); + } + } + + private void walkCommit(CommitObj commit, ObjectsResolverContext objectsResolverContext) { + objectsResolverContext.filter().markReferenced(commit.id()); + + objectsResolverContext + .relatedObjects() + .commitRelatedObjects(commit) + .forEach(id -> handleObj(id, objectsResolverContext)); + + IndexesLogic indexesLogic = indexesLogic(objectsResolverContext.persist()); + StoreIndex index = indexesLogic.buildCompleteIndexOrEmpty(commit); + for (StoreIndexElement indexElement : index) { + CommitOp content = indexElement.content(); + if (content.action().exists()) { + ObjId value = content.value(); + handleObj(value, objectsResolverContext); + } + } + } + + private void handleObj(ObjId value, ObjectsResolverContext objectsResolverContext) { + // TODO fetch 'value's in batches + + Obj obj; + try { + obj = objectsResolverContext.persist().fetchObj(value); + } catch (ObjNotFoundException e) { + // ignore not-found situation - nothing what we could do here + return; + } + + objectsResolverContext.filter().markReferenced(value); + + ObjType type = obj.type(); + + if (StandardObjType.VALUE.equals(type)) { + ContentValueObj contentValueObj = (ContentValueObj) obj; + Content content = + DefaultStoreWorker.instance() + .valueFromStore(contentValueObj.payload(), contentValueObj.data()); + + handleContent(content, objectsResolverContext); + } + } + + private void handleContent(Content content, ObjectsResolverContext objectsResolverContext) { + objectsResolverContext + .relatedObjects() + .contentRelatedObjects(content) + .forEach( + id -> { + handleObj(id, objectsResolverContext); + }); + } + + @Override + public void close() { + // TODO handle remaining batched 'value's from handleValue(). + } +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/MustRestartWithBiggerFilterException.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/MustRestartWithBiggerFilterException.java new file mode 100644 index 00000000000..a902fce5be2 --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/MustRestartWithBiggerFilterException.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +/** + * Thrown when the bloom filter's FPP is above the configured threshold when adding IDs. If this + * exception is encountered, the current garbage-collection run must be aborted and + * restarted with a bigger {@link ObjectsResolverParams#expectedObjects()} value. + */ +public class MustRestartWithBiggerFilterException extends RuntimeException { + public MustRestartWithBiggerFilterException(String msg) { + super(msg); + } +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilter.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilter.java new file mode 100644 index 00000000000..5a9decbf6be --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilter.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +public interface ObjIdFilter { + boolean markReferenced(@NotNull ObjId objId) throws MustRestartWithBiggerFilterException; + + boolean isProbablyReferenced(@NotNull ObjId objId); + + boolean withinExpectedFpp(); + + long approximateElementCount(); + + double expectedFpp(); +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilterImpl.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilterImpl.java new file mode 100644 index 00000000000..d3a8e275a69 --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjIdFilterImpl.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import com.google.common.hash.BloomFilter; +import com.google.common.hash.PrimitiveSink; +import java.util.concurrent.atomic.AtomicLong; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +@SuppressWarnings("UnstableApiUsage") +final class ObjIdFilterImpl implements ObjIdFilter { + private final BloomFilter filter; + private final double allowedFalsePositiveProbability; + private final AtomicLong remainingElements; + + ObjIdFilterImpl(ObjectsResolverParams params) { + this.filter = createBloomFilter(params); + this.remainingElements = new AtomicLong(params.expectedObjects()); + this.allowedFalsePositiveProbability = params.allowedFalsePositiveProbability(); + } + + static BloomFilter createBloomFilter(ObjectsResolverParams params) { + return BloomFilter.create( + ObjIdFilterImpl::funnel, params.expectedObjects(), params.falsePositiveProbability()); + } + + private static void funnel(ObjId id, PrimitiveSink primitiveSink) { + int idSize = id.size(); + int i = 0; + for (; idSize >= 8; idSize -= 8) { + primitiveSink.putLong(id.longAt(i++)); + } + i <<= 3; + for (; idSize > 0; idSize--) { + primitiveSink.putByte(id.byteAt(i++)); + } + } + + @Override + public boolean markReferenced(ObjId objId) throws MustRestartWithBiggerFilterException { + if (filter.put(objId)) { + if (remainingElements.decrementAndGet() >= 0L || withinExpectedFpp()) { + return true; + } + throw new MustRestartWithBiggerFilterException( + "Bloom filter exceeded the configured expected FPP"); + } + return false; + } + + @Override + public boolean isProbablyReferenced(ObjId objId) { + return filter.mightContain(objId); + } + + @Override + public boolean withinExpectedFpp() { + return expectedFpp() <= allowedFalsePositiveProbability; + } + + @Override + public long approximateElementCount() { + return filter.approximateElementCount(); + } + + @Override + public double expectedFpp() { + return filter.expectedFpp(); + } +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverContext.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverContext.java new file mode 100644 index 00000000000..69085c148e2 --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverContext.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import jakarta.validation.constraints.NotNull; +import java.time.Instant; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.transfer.related.TransferRelatedObjects; + +public interface ObjectsResolverContext { + @NotNull + Persist persist(); + + @NotNull + ObjIdFilter filter(); + + @NotNull + Instant started(); + + @NotNull + TransferRelatedObjects relatedObjects(); +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverContextImpl.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverContextImpl.java new file mode 100644 index 00000000000..c3625d0fdd8 --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverContextImpl.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import java.time.Instant; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.transfer.related.TransferRelatedObjects; + +final class ObjectsResolverContextImpl implements ObjectsResolverContext { + private final Persist persist; + private final ObjIdFilter filter; + private final Instant started; + private final TransferRelatedObjects relatedObjects; + + ObjectsResolverContextImpl(Persist persist, ObjectsResolverParams params, Instant started) { + this.persist = persist; + this.filter = new ObjIdFilterImpl(params); + this.started = started; + this.relatedObjects = params.relatedObjects(); + } + + @Override + public Persist persist() { + return persist; + } + + @Override + public ObjIdFilter filter() { + return filter; + } + + @Override + public Instant started() { + return started; + } + + @Override + public TransferRelatedObjects relatedObjects() { + return relatedObjects; + } +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverParams.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverParams.java new file mode 100644 index 00000000000..e0f76ffb8ed --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/ObjectsResolverParams.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import org.immutables.value.Value; +import org.projectnessie.versioned.transfer.related.TransferRelatedObjects; + +@Value.Immutable +public interface ObjectsResolverParams { + // Following defaults result in a serialized bloom filter size of about 3000000 bytes. + long DEFAULT_EXPECTED_FILE_COUNT = 1_000_000L; + double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.00001d; + double DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY = 0.0001d; + + static ImmutableObjectsResolverParams.Builder builder() { + return ImmutableObjectsResolverParams.builder(); + } + + @Value.Default + default long expectedObjects() { + return DEFAULT_EXPECTED_FILE_COUNT; + } + + @Value.Default + default double falsePositiveProbability() { + return DEFAULT_FALSE_POSITIVE_PROBABILITY; + } + + @Value.Default + default double allowedFalsePositiveProbability() { + return DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY; + } + + @Value.Default + default TransferRelatedObjects relatedObjects() { + return new TransferRelatedObjects() {}; + } +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/PurgeObjects.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/PurgeObjects.java new file mode 100644 index 00000000000..777b8b1f02b --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/PurgeObjects.java @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import jakarta.validation.constraints.NotNull; + +public interface PurgeObjects { + void purge(@NotNull ObjectsResolverContext objectsResolverContext); +} diff --git a/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/PurgeObjectsImpl.java b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/PurgeObjectsImpl.java new file mode 100644 index 00000000000..ba467d123bf --- /dev/null +++ b/versioned/storage/garbage-collect/src/main/java/org/projectnessie/versioned/storage/garbagecollect/PurgeObjectsImpl.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.Set; +import java.util.function.Predicate; +import org.projectnessie.versioned.storage.common.persist.CloseableIterator; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +final class PurgeObjectsImpl implements PurgeObjects { + @Override + public void purge(ObjectsResolverContext objectsResolverContext) { + Predicate predicate = objectsResolverContext.filter()::isProbablyReferenced; + long newObject = MILLISECONDS.toMicros(objectsResolverContext.started().toEpochMilli()); + + try (CloseableIterator iter = objectsResolverContext.persist().scanAllObjects(Set.of())) { + while (iter.hasNext()) { + Obj obj = iter.next(); + if (predicate.test(obj.id()) || obj.referenced() > newObject) { + // Either found as referenced or recently created + continue; + } + + purgeObj(obj, objectsResolverContext); + } + } + } + + private void purgeObj(Obj obj, ObjectsResolverContext objectsResolverContext) { + ObjId id = obj.id(); + // TODO include 'referenced' for a CAS deletion, requires changes to Persist + long referenced = obj.referenced(); + + // TODO delete in batches or (with the CAS) delete in parallel (multiple threads) + objectsResolverContext.persist().deleteObj(id); + + // TODO increment a deleted objs counter + } +} diff --git a/versioned/storage/garbage-collect/src/test/java/org/projectnessie/versioned/storage/garbagecollect/TestObjIdFilterImpl.java b/versioned/storage/garbage-collect/src/test/java/org/projectnessie/versioned/storage/garbagecollect/TestObjIdFilterImpl.java new file mode 100644 index 00000000000..41c69f995ea --- /dev/null +++ b/versioned/storage/garbage-collect/src/test/java/org/projectnessie/versioned/storage/garbagecollect/TestObjIdFilterImpl.java @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.garbagecollect; + +import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteArray; +import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestObjIdFilterImpl { + @InjectSoftAssertions SoftAssertions soft; + + @Test + public void emptyFilterContainsNothing() { + ObjIdFilterImpl filter = new ObjIdFilterImpl(ObjectsResolverParams.builder().build()); + soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isFalse(); + for (int i = 0; i < 100; i++) { + ObjId id = randomObjId(); + soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isFalse(); + } + } + + @Test + public void filterContainsAdded() { + ObjIdFilterImpl filter = new ObjIdFilterImpl(ObjectsResolverParams.builder().build()); + + soft.assertThat(filter.markReferenced(ObjId.EMPTY_OBJ_ID)).isTrue(); + + Set ids = new HashSet<>(3000); + for (int i = 0; i < 1000; i++) { + ids.add(randomObjId()); + } + + for (int i = 0; i < 1000; i++) { + byte[] bytes = new byte[4 + ThreadLocalRandom.current().nextInt(33)]; + ThreadLocalRandom.current().nextBytes(bytes); + ids.add(objIdFromByteArray(bytes)); + } + + for (ObjId id : ids) { + // There is a theoretical chance that this assertion fails, but that change is extremely low. + // (We're adding 2000 object IDs to a bloom filter with an expected object count of 1M and a + // low FPP.) + soft.assertThat(filter.markReferenced(id)).isTrue(); + } + + soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isTrue(); + for (ObjId id : ids) { + soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isTrue(); + } + } + + @ParameterizedTest + @ValueSource(ints = {100, 1_000, 10_000}) + public void withinExpectedFpp(int expected) { + ObjIdFilterImpl filter = + new ObjIdFilterImpl(ObjectsResolverParams.builder().expectedObjects(expected).build()); + + for (int i = 0; i < expected; i++) { + ObjId id = randomObjId(); + soft.assertThatCode(() -> filter.markReferenced(id)).doesNotThrowAnyException(); + soft.assertThat(filter.withinExpectedFpp()).isTrue(); + } + + // "withinExpectedFpp" should trigger at some point + boolean thrown = false; + for (int i = 0; i < expected / 2; i++) { + ObjId id = randomObjId(); + try { + filter.markReferenced(id); + soft.assertThat(filter.withinExpectedFpp()).isTrue(); + } catch (MustRestartWithBiggerFilterException e) { + soft.assertThat(filter.withinExpectedFpp()).isFalse(); + thrown = true; + break; + } + } + soft.assertThat(thrown).isTrue(); + } +}