Skip to content

HDDS-1986. Fix listkeys API. #1588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.client.BlockID;
Expand Down Expand Up @@ -653,7 +656,12 @@ public List<OmBucketInfo> listBuckets(final String volumeName,
@Override
public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
String startKey, String keyPrefix, int maxKeys) throws IOException {

List<OmKeyInfo> result = new ArrayList<>();
if (maxKeys <= 0) {
return result;
}

if (Strings.isNullOrEmpty(volumeName)) {
throw new OMException("Volume name is required.",
ResultCodes.VOLUME_NOT_FOUND);
Expand Down Expand Up @@ -688,26 +696,85 @@ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
seekPrefix = getBucketKey(volumeName, bucketName + OM_KEY_PREFIX);
}
int currentCount = 0;
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
getKeyTable()
.iterator()) {
KeyValue<String, OmKeyInfo> kv = keyIter.seek(seekKey);
while (currentCount < maxKeys && keyIter.hasNext()) {
kv = keyIter.next();
// Skip the Start key if needed.
if (kv != null && skipStartKey && kv.getKey().equals(seekKey)) {
continue;


TreeMap<String, OmKeyInfo> cacheKeyMap = new TreeMap<>();
Set<String> deletedKeySet = new TreeSet<>();
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> iterator =
keyTable.cacheIterator();

//TODO: We can avoid this iteration if table cache has stored entries in
// treemap. Currently HashMap is used in Cache. HashMap get operation is an
// constant time operation, where as for treeMap get is log(n).
// So if we move to treemap, the get operation will be affected. As get
// is frequent operation on table. So, for now in list we iterate cache map
// and construct treeMap which match with keyPrefix and are greater than or
// equal to startKey. Later we can revisit this, if list operation
// is becoming slow.
while (iterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many keys are expected in this cache? and how many in the tree ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that we are better off leaving the old code in place...where we can read from the DB.. Worst, we might have to make sure that cache is flushed to DB before doing the list operation.But practically it may not matter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with putting this change in if we can prove that we can do large list keys. You might want to borrow the DB from @nandakumar131 and see if you can list keys with this patch, just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key cache is not full cache, so if double buffer flush is going on well in background, this should have around couple of 100 entries. When I started freon with 10 threads, i see the value of maximum iteration is 200. So, almost in the cache we have 200 entries. (But on tried with busy workload clusters, slow disks)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With current new code when the list happens we should consider entries from buffer and DB. (As we return the response to end-user after adding entries to cache). So, if user does list as next operation(next to create bucket) the bucket might/might not be there until double buffer flushes. As until double buffer flushes, we will have entries in cache. (This will not be problem for non-HA, as we return the response, only after the flush)

Map.Entry< CacheKey<String>, CacheValue<OmKeyInfo>> entry =
iterator.next();

String key = entry.getKey().getCacheKey();
OmKeyInfo omKeyInfo = entry.getValue().getCacheValue();
// Making sure that entry in cache is not for delete key request.

if (omKeyInfo != null) {
if (key.startsWith(seekPrefix) && key.compareTo(seekKey) >= 0) {
cacheKeyMap.put(key, omKeyInfo);
}
} else {
deletedKeySet.add(key);
}
}

// Get maxKeys from DB if it has.

try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
keyIter = getKeyTable().iterator()) {
KeyValue< String, OmKeyInfo > kv;
keyIter.seek(seekKey);
// we need to iterate maxKeys + 1 here because if skipStartKey is true,
// we should skip that entry and return the result.
while (currentCount < maxKeys + 1 && keyIter.hasNext()) {
kv = keyIter.next();
if (kv != null && kv.getKey().startsWith(seekPrefix)) {
result.add(kv.getValue());
currentCount++;

// Entry should not be marked for delete, consider only those
// entries.
if(!deletedKeySet.contains(kv.getKey())) {
cacheKeyMap.put(kv.getKey(), kv.getValue());
currentCount++;
}
} else {
// The SeekPrefix does not match any more, we can break out of the
// loop.
break;
}
}
}

// Finally DB entries and cache entries are merged, then return the count
// of maxKeys from the sorted map.
currentCount = 0;

for (Map.Entry<String, OmKeyInfo> cacheKey : cacheKeyMap.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second iteration is unfortunate. We should see if there is a way to avoid it.

if (cacheKey.getKey().equals(seekKey) && skipStartKey) {
continue;
}

result.add(cacheKey.getValue());
currentCount++;

if (currentCount == maxKeys) {
break;
}
}

// Clear map and set.
cacheKeyMap.clear();
deletedKeySet.clear();

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.google.common.base.Optional;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -188,4 +190,228 @@ private void addBucketsToCache(String volumeName, String bucketName) {
new CacheValue<>(Optional.of(omBucketInfo), 1));
}

@Test
public void testListKeys() throws Exception {

String volumeNameA = "volumeA";
String volumeNameB = "volumeB";
String ozoneBucket = "ozoneBucket";
String hadoopBucket = "hadoopBucket";


// Create volumes and buckets.
TestOMRequestUtils.addVolumeToDB(volumeNameA, omMetadataManager);
TestOMRequestUtils.addVolumeToDB(volumeNameB, omMetadataManager);
addBucketsToCache(volumeNameA, ozoneBucket);
addBucketsToCache(volumeNameB, hadoopBucket);


String prefixKeyA = "key-a";
String prefixKeyB = "key-b";
TreeSet<String> keysASet = new TreeSet<>();
TreeSet<String> keysBSet = new TreeSet<>();
for (int i=1; i<= 100; i++) {
if (i % 2 == 0) {
keysASet.add(
prefixKeyA + i);
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i);
} else {
keysBSet.add(
prefixKeyB + i);
addKeysToOM(volumeNameA, hadoopBucket, prefixKeyB + i, i);
}
}


TreeSet<String> keysAVolumeBSet = new TreeSet<>();
TreeSet<String> keysBVolumeBSet = new TreeSet<>();
for (int i=1; i<= 100; i++) {
if (i % 2 == 0) {
keysAVolumeBSet.add(
prefixKeyA + i);
addKeysToOM(volumeNameB, ozoneBucket, prefixKeyA + i, i);
} else {
keysBVolumeBSet.add(
prefixKeyB + i);
addKeysToOM(volumeNameB, hadoopBucket, prefixKeyB + i, i);
}
}


// List all keys which have prefix "key-a"
List<OmKeyInfo> omKeyInfoList =
omMetadataManager.listKeys(volumeNameA, ozoneBucket,
null, prefixKeyA, 100);

Assert.assertEquals(omKeyInfoList.size(), 50);

for (OmKeyInfo omKeyInfo : omKeyInfoList) {
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
prefixKeyA));
}


String startKey = prefixKeyA + 10;
omKeyInfoList =
omMetadataManager.listKeys(volumeNameA, ozoneBucket,
startKey, prefixKeyA, 100);

Assert.assertEquals(keysASet.tailSet(
startKey).size() - 1, omKeyInfoList.size());

startKey = prefixKeyA + 38;
omKeyInfoList =
omMetadataManager.listKeys(volumeNameA, ozoneBucket,
startKey, prefixKeyA, 100);

Assert.assertEquals(keysASet.tailSet(
startKey).size() - 1, omKeyInfoList.size());

for (OmKeyInfo omKeyInfo : omKeyInfoList) {
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
prefixKeyA));
Assert.assertFalse(omKeyInfo.getBucketName().equals(
prefixKeyA + 38));
}



omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket,
null, prefixKeyB, 100);

Assert.assertEquals(omKeyInfoList.size(), 50);

for (OmKeyInfo omKeyInfo : omKeyInfoList) {
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
prefixKeyB));
}

// Try to get keys by count 10, like that get all keys in the
// volumeB/ozoneBucket with "key-a".
startKey = null;
TreeSet<String> expectedKeys = new TreeSet<>();
for (int i=0; i<5; i++) {

omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket,
startKey, prefixKeyB, 10);

Assert.assertEquals(10, omKeyInfoList.size());

for (OmKeyInfo omKeyInfo : omKeyInfoList) {
expectedKeys.add(omKeyInfo.getKeyName());
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
prefixKeyB));
startKey = omKeyInfo.getKeyName();
}
}

Assert.assertEquals(expectedKeys, keysBVolumeBSet);


// As now we have iterated all 50 buckets, calling next time should
// return empty list.
omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket,
startKey, prefixKeyB, 10);

Assert.assertEquals(omKeyInfoList.size(), 0);

}

@Test
public void testListKeysWithFewDeleteEntriesInCache() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are you ensuring entries are in the cache? For that you have to pause double buffer flush right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test case, we are adding entries in cache manually. It is not an integration test.

String volumeNameA = "volumeA";
String ozoneBucket = "ozoneBucket";

// Create volumes and bucket.
TestOMRequestUtils.addVolumeToDB(volumeNameA, omMetadataManager);

addBucketsToCache(volumeNameA, ozoneBucket);

String prefixKeyA = "key-a";
TreeSet<String> keysASet = new TreeSet<>();
TreeSet<String> deleteKeySet = new TreeSet<>();


for (int i=1; i<= 100; i++) {
if (i % 2 == 0) {
keysASet.add(
prefixKeyA + i);
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i);
} else {
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i);
String key = omMetadataManager.getOzoneKey(volumeNameA,
ozoneBucket, prefixKeyA + i);
// Mark as deleted in cache.
omMetadataManager.getKeyTable().addCacheEntry(
new CacheKey<>(key),
new CacheValue<>(Optional.absent(), 100L));
deleteKeySet.add(key);
}
}

// Now list keys which match with prefixKeyA.
List<OmKeyInfo> omKeyInfoList =
omMetadataManager.listKeys(volumeNameA, ozoneBucket,
null, prefixKeyA, 100);

// As in total 100, 50 are marked for delete. It should list only 50 keys.
Assert.assertEquals(50, omKeyInfoList.size());

TreeSet<String> expectedKeys = new TreeSet<>();

for (OmKeyInfo omKeyInfo : omKeyInfoList) {
expectedKeys.add(omKeyInfo.getKeyName());
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(prefixKeyA));
}

Assert.assertEquals(expectedKeys, keysASet);


// Now get key count by 10.
String startKey = null;
expectedKeys = new TreeSet<>();
for (int i=0; i<5; i++) {

omKeyInfoList = omMetadataManager.listKeys(volumeNameA, ozoneBucket,
startKey, prefixKeyA, 10);

System.out.println(i);
Assert.assertEquals(10, omKeyInfoList.size());

for (OmKeyInfo omKeyInfo : omKeyInfoList) {
expectedKeys.add(omKeyInfo.getKeyName());
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(
prefixKeyA));
startKey = omKeyInfo.getKeyName();
}
}

Assert.assertEquals(keysASet, expectedKeys);


// As now we have iterated all 50 buckets, calling next time should
// return empty list.
omKeyInfoList = omMetadataManager.listKeys(volumeNameA, ozoneBucket,
startKey, prefixKeyA, 10);

Assert.assertEquals(omKeyInfoList.size(), 0);



}

private void addKeysToOM(String volumeName, String bucketName,
String keyName, int i) throws Exception {

if (i%2== 0) {
TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, keyName,
1000L, HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, omMetadataManager);
} else {
TestOMRequestUtils.addKeyToTableCache(volumeName, bucketName, keyName,
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE,
omMetadataManager);
}
}

}
Loading