Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ An experiment to provide ETCD layer on top of FoundationDB, built with [Record-L
* Integrations test using a real FDB spawned with testcontainers and official Java etcd client,
* Tests are backported from jetcd test cases
* Supported operations:
* put
* get
* scan
* delete
* put,
* get,
* scan,
* delete,
* compact,
* ETCD MVCC simulated using FDB's read version

For TODO's, please have a look to the [Github issues](https://github.com/pierrez/fdb-etcd/issues).

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/fr/pierrezemb/fdb/layer/etcd/service/KVService.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,20 @@ public void deleteRange(EtcdIoRpcProto.DeleteRangeRequest request, Promise<EtcdI
request.getRangeEnd().isEmpty() ? request.getKey().toByteArray() : request.getRangeEnd().toByteArray());
response.complete(EtcdIoRpcProto.DeleteRangeResponse.newBuilder().setDeleted(count.longValue()).build());
}

/**
* <pre>
* Compact compacts the event history in the etcd key-value store. The key-value
* store should be periodically compacted or the event history will continue to grow
* indefinitely.
* </pre>
*
* @param request
* @param response
*/
@Override
public void compact(EtcdIoRpcProto.CompactionRequest request, Promise<EtcdIoRpcProto.CompactionResponse> response) {
this.recordStore.compact(request.getRevision());
response.complete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,30 @@ public Integer delete(byte[] start, byte[] end) {
log.trace("deleted {} records", count);
return count;
}

public void compact(long revision) {
Integer count = this.db.run(context -> {
log.warn("compacting any record before {}", revision);
FDBRecordStore recordStore = recordStoreProvider.apply(context);

RecordQuery query = RecordQuery.newBuilder()
.setRecordType("KeyValue")
.setFilter(Query.field("mod_revision").lessThanOrEquals(revision)).build();

return recordStoreProvider
.apply(context)
// this returns an asynchronous cursor over the results of our query
.executeQuery(query)
.map(queriedRecord -> EtcdRecord.KeyValue.newBuilder()
.mergeFrom(queriedRecord.getRecord()).build())
.map(r -> {
log.trace("found a record to delete: {}", r);
return r;
})
.map(record -> recordStore.deleteRecord(Tuple.from(record.getKey().toByteArray(), record.getVersion())))
.getCount()
.join();
});
log.trace("deleted {} records", count);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fr.pierrezemb.fdb.layer.etcd.service;

import static fr.pierrezemb.fdb.layer.etcd.TestUtil.bytesOf;
import static fr.pierrezemb.fdb.layer.etcd.TestUtil.randomByteSequence;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -245,6 +246,25 @@ public void testNestedTxn() throws Exception {
assertEquals(oneTwoThree.toString(UTF_8), getResp2.getKvs().get(0).getValue().toString(UTF_8));
}

@Test
public void compact() throws Exception {
final ByteSequence key = randomByteSequence();
final ByteSequence value = randomByteSequence();

// Insert key twice to ensure we have at least two revisions
final PutResponse oldResponse = kvClient.put(key, value).get();
final PutResponse putResponse = kvClient.put(key, value).get();

final GetResponse getBeforeCompact = kvClient.get(key, GetOption.newBuilder().withRevision(oldResponse.getHeader().getRevision()).build()).get();
assertEquals("should not be empty", 1, getBeforeCompact.getKvs().size());

// Compact until latest revision
client.getKVClient().compact(putResponse.getHeader().getRevision()).get();

final GetResponse getAfterCompact = kvClient.get(key, GetOption.newBuilder().withRevision(oldResponse.getHeader().getRevision()).build()).get();
assertEquals("should be empty", 0, getAfterCompact.getKvs().size());
}

@AfterAll
void tearDown() {
container.stop();
Expand Down