-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HDDS-1986. Fix listkeys API. #1588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,9 @@ | |
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.TreeMap; | ||
import java.util.TreeSet; | ||
import java.util.stream.Collectors; | ||
|
||
import org.apache.hadoop.hdds.client.BlockID; | ||
|
@@ -653,7 +656,12 @@ public List<OmBucketInfo> listBuckets(final String volumeName, | |
@Override | ||
public List<OmKeyInfo> listKeys(String volumeName, String bucketName, | ||
String startKey, String keyPrefix, int maxKeys) throws IOException { | ||
|
||
List<OmKeyInfo> result = new ArrayList<>(); | ||
if (maxKeys <= 0) { | ||
return result; | ||
} | ||
|
||
if (Strings.isNullOrEmpty(volumeName)) { | ||
throw new OMException("Volume name is required.", | ||
ResultCodes.VOLUME_NOT_FOUND); | ||
|
@@ -688,26 +696,85 @@ public List<OmKeyInfo> listKeys(String volumeName, String bucketName, | |
seekPrefix = getBucketKey(volumeName, bucketName + OM_KEY_PREFIX); | ||
} | ||
int currentCount = 0; | ||
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter = | ||
getKeyTable() | ||
.iterator()) { | ||
KeyValue<String, OmKeyInfo> kv = keyIter.seek(seekKey); | ||
while (currentCount < maxKeys && keyIter.hasNext()) { | ||
kv = keyIter.next(); | ||
// Skip the Start key if needed. | ||
if (kv != null && skipStartKey && kv.getKey().equals(seekKey)) { | ||
continue; | ||
|
||
|
||
TreeMap<String, OmKeyInfo> cacheKeyMap = new TreeMap<>(); | ||
Set<String> deletedKeySet = new TreeSet<>(); | ||
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> iterator = | ||
keyTable.cacheIterator(); | ||
|
||
//TODO: We can avoid this iteration if table cache has stored entries in | ||
// treemap. Currently HashMap is used in Cache. HashMap get operation is an | ||
// constant time operation, where as for treeMap get is log(n). | ||
// So if we move to treemap, the get operation will be affected. As get | ||
// is frequent operation on table. So, for now in list we iterate cache map | ||
// and construct treeMap which match with keyPrefix and are greater than or | ||
// equal to startKey. Later we can revisit this, if list operation | ||
// is becoming slow. | ||
while (iterator.hasNext()) { | ||
Map.Entry< CacheKey<String>, CacheValue<OmKeyInfo>> entry = | ||
iterator.next(); | ||
|
||
String key = entry.getKey().getCacheKey(); | ||
OmKeyInfo omKeyInfo = entry.getValue().getCacheValue(); | ||
// Making sure that entry in cache is not for delete key request. | ||
|
||
if (omKeyInfo != null) { | ||
if (key.startsWith(seekPrefix) && key.compareTo(seekKey) >= 0) { | ||
nandakumar131 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cacheKeyMap.put(key, omKeyInfo); | ||
} | ||
} else { | ||
deletedKeySet.add(key); | ||
} | ||
} | ||
|
||
// Get maxKeys from DB if it has. | ||
|
||
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> | ||
keyIter = getKeyTable().iterator()) { | ||
KeyValue< String, OmKeyInfo > kv; | ||
keyIter.seek(seekKey); | ||
// we need to iterate maxKeys + 1 here because if skipStartKey is true, | ||
// we should skip that entry and return the result. | ||
while (currentCount < maxKeys + 1 && keyIter.hasNext()) { | ||
kv = keyIter.next(); | ||
if (kv != null && kv.getKey().startsWith(seekPrefix)) { | ||
result.add(kv.getValue()); | ||
currentCount++; | ||
|
||
// Entry should not be marked for delete, consider only those | ||
// entries. | ||
if(!deletedKeySet.contains(kv.getKey())) { | ||
cacheKeyMap.put(kv.getKey(), kv.getValue()); | ||
currentCount++; | ||
} | ||
} else { | ||
// The SeekPrefix does not match any more, we can break out of the | ||
// loop. | ||
break; | ||
} | ||
} | ||
} | ||
|
||
// Finally DB entries and cache entries are merged, then return the count | ||
// of maxKeys from the sorted map. | ||
currentCount = 0; | ||
|
||
for (Map.Entry<String, OmKeyInfo> cacheKey : cacheKeyMap.entrySet()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The second iteration is unfortunate. We should see if there is a way to avoid it. |
||
if (cacheKey.getKey().equals(seekKey) && skipStartKey) { | ||
continue; | ||
} | ||
|
||
result.add(cacheKey.getValue()); | ||
currentCount++; | ||
|
||
if (currentCount == maxKeys) { | ||
break; | ||
} | ||
} | ||
|
||
// Clear map and set. | ||
cacheKeyMap.clear(); | ||
deletedKeySet.clear(); | ||
|
||
return result; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,11 @@ | |
import com.google.common.base.Optional; | ||
import org.apache.hadoop.hdds.conf.OzoneConfiguration; | ||
import org.apache.hadoop.hdds.protocol.StorageType; | ||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; | ||
import org.apache.hadoop.hdds.utils.db.cache.CacheKey; | ||
import org.apache.hadoop.hdds.utils.db.cache.CacheValue; | ||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; | ||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; | ||
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; | ||
import org.junit.Assert; | ||
import org.junit.Before; | ||
|
@@ -188,4 +190,228 @@ private void addBucketsToCache(String volumeName, String bucketName) { | |
new CacheValue<>(Optional.of(omBucketInfo), 1)); | ||
} | ||
|
||
@Test | ||
public void testListKeys() throws Exception { | ||
|
||
String volumeNameA = "volumeA"; | ||
String volumeNameB = "volumeB"; | ||
String ozoneBucket = "ozoneBucket"; | ||
String hadoopBucket = "hadoopBucket"; | ||
|
||
|
||
// Create volumes and buckets. | ||
TestOMRequestUtils.addVolumeToDB(volumeNameA, omMetadataManager); | ||
TestOMRequestUtils.addVolumeToDB(volumeNameB, omMetadataManager); | ||
addBucketsToCache(volumeNameA, ozoneBucket); | ||
addBucketsToCache(volumeNameB, hadoopBucket); | ||
|
||
|
||
String prefixKeyA = "key-a"; | ||
String prefixKeyB = "key-b"; | ||
TreeSet<String> keysASet = new TreeSet<>(); | ||
TreeSet<String> keysBSet = new TreeSet<>(); | ||
for (int i=1; i<= 100; i++) { | ||
if (i % 2 == 0) { | ||
keysASet.add( | ||
prefixKeyA + i); | ||
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i); | ||
} else { | ||
keysBSet.add( | ||
prefixKeyB + i); | ||
addKeysToOM(volumeNameA, hadoopBucket, prefixKeyB + i, i); | ||
} | ||
} | ||
|
||
|
||
TreeSet<String> keysAVolumeBSet = new TreeSet<>(); | ||
TreeSet<String> keysBVolumeBSet = new TreeSet<>(); | ||
for (int i=1; i<= 100; i++) { | ||
if (i % 2 == 0) { | ||
keysAVolumeBSet.add( | ||
prefixKeyA + i); | ||
addKeysToOM(volumeNameB, ozoneBucket, prefixKeyA + i, i); | ||
} else { | ||
keysBVolumeBSet.add( | ||
prefixKeyB + i); | ||
addKeysToOM(volumeNameB, hadoopBucket, prefixKeyB + i, i); | ||
} | ||
} | ||
|
||
|
||
// List all keys which have prefix "key-a" | ||
List<OmKeyInfo> omKeyInfoList = | ||
omMetadataManager.listKeys(volumeNameA, ozoneBucket, | ||
null, prefixKeyA, 100); | ||
|
||
Assert.assertEquals(omKeyInfoList.size(), 50); | ||
|
||
for (OmKeyInfo omKeyInfo : omKeyInfoList) { | ||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith( | ||
prefixKeyA)); | ||
} | ||
|
||
|
||
String startKey = prefixKeyA + 10; | ||
omKeyInfoList = | ||
omMetadataManager.listKeys(volumeNameA, ozoneBucket, | ||
startKey, prefixKeyA, 100); | ||
|
||
Assert.assertEquals(keysASet.tailSet( | ||
startKey).size() - 1, omKeyInfoList.size()); | ||
|
||
startKey = prefixKeyA + 38; | ||
omKeyInfoList = | ||
omMetadataManager.listKeys(volumeNameA, ozoneBucket, | ||
startKey, prefixKeyA, 100); | ||
|
||
Assert.assertEquals(keysASet.tailSet( | ||
startKey).size() - 1, omKeyInfoList.size()); | ||
|
||
for (OmKeyInfo omKeyInfo : omKeyInfoList) { | ||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith( | ||
prefixKeyA)); | ||
Assert.assertFalse(omKeyInfo.getBucketName().equals( | ||
prefixKeyA + 38)); | ||
} | ||
|
||
|
||
|
||
omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket, | ||
null, prefixKeyB, 100); | ||
|
||
Assert.assertEquals(omKeyInfoList.size(), 50); | ||
|
||
for (OmKeyInfo omKeyInfo : omKeyInfoList) { | ||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith( | ||
prefixKeyB)); | ||
} | ||
|
||
// Try to get keys by count 10, like that get all keys in the | ||
// volumeB/ozoneBucket with "key-a". | ||
startKey = null; | ||
TreeSet<String> expectedKeys = new TreeSet<>(); | ||
for (int i=0; i<5; i++) { | ||
|
||
omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket, | ||
startKey, prefixKeyB, 10); | ||
|
||
Assert.assertEquals(10, omKeyInfoList.size()); | ||
|
||
for (OmKeyInfo omKeyInfo : omKeyInfoList) { | ||
expectedKeys.add(omKeyInfo.getKeyName()); | ||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith( | ||
prefixKeyB)); | ||
startKey = omKeyInfo.getKeyName(); | ||
} | ||
} | ||
|
||
Assert.assertEquals(expectedKeys, keysBVolumeBSet); | ||
|
||
|
||
// As now we have iterated all 50 buckets, calling next time should | ||
// return empty list. | ||
omKeyInfoList = omMetadataManager.listKeys(volumeNameB, hadoopBucket, | ||
startKey, prefixKeyB, 10); | ||
|
||
Assert.assertEquals(omKeyInfoList.size(), 0); | ||
|
||
} | ||
|
||
@Test | ||
public void testListKeysWithFewDeleteEntriesInCache() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How are you ensuring entries are in the cache? For that you have to pause double buffer flush right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this test case, we are adding entries in cache manually. It is not an integration test. |
||
String volumeNameA = "volumeA"; | ||
String ozoneBucket = "ozoneBucket"; | ||
|
||
// Create volumes and bucket. | ||
TestOMRequestUtils.addVolumeToDB(volumeNameA, omMetadataManager); | ||
|
||
addBucketsToCache(volumeNameA, ozoneBucket); | ||
|
||
String prefixKeyA = "key-a"; | ||
TreeSet<String> keysASet = new TreeSet<>(); | ||
TreeSet<String> deleteKeySet = new TreeSet<>(); | ||
|
||
|
||
for (int i=1; i<= 100; i++) { | ||
if (i % 2 == 0) { | ||
keysASet.add( | ||
prefixKeyA + i); | ||
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i); | ||
} else { | ||
addKeysToOM(volumeNameA, ozoneBucket, prefixKeyA + i, i); | ||
String key = omMetadataManager.getOzoneKey(volumeNameA, | ||
ozoneBucket, prefixKeyA + i); | ||
// Mark as deleted in cache. | ||
omMetadataManager.getKeyTable().addCacheEntry( | ||
new CacheKey<>(key), | ||
new CacheValue<>(Optional.absent(), 100L)); | ||
deleteKeySet.add(key); | ||
} | ||
} | ||
|
||
// Now list keys which match with prefixKeyA. | ||
List<OmKeyInfo> omKeyInfoList = | ||
omMetadataManager.listKeys(volumeNameA, ozoneBucket, | ||
null, prefixKeyA, 100); | ||
|
||
// As in total 100, 50 are marked for delete. It should list only 50 keys. | ||
Assert.assertEquals(50, omKeyInfoList.size()); | ||
|
||
TreeSet<String> expectedKeys = new TreeSet<>(); | ||
|
||
for (OmKeyInfo omKeyInfo : omKeyInfoList) { | ||
expectedKeys.add(omKeyInfo.getKeyName()); | ||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith(prefixKeyA)); | ||
} | ||
|
||
Assert.assertEquals(expectedKeys, keysASet); | ||
|
||
|
||
// Now get key count by 10. | ||
String startKey = null; | ||
expectedKeys = new TreeSet<>(); | ||
for (int i=0; i<5; i++) { | ||
|
||
omKeyInfoList = omMetadataManager.listKeys(volumeNameA, ozoneBucket, | ||
startKey, prefixKeyA, 10); | ||
|
||
System.out.println(i); | ||
Assert.assertEquals(10, omKeyInfoList.size()); | ||
|
||
for (OmKeyInfo omKeyInfo : omKeyInfoList) { | ||
expectedKeys.add(omKeyInfo.getKeyName()); | ||
Assert.assertTrue(omKeyInfo.getKeyName().startsWith( | ||
prefixKeyA)); | ||
startKey = omKeyInfo.getKeyName(); | ||
} | ||
} | ||
|
||
Assert.assertEquals(keysASet, expectedKeys); | ||
|
||
|
||
// As now we have iterated all 50 buckets, calling next time should | ||
// return empty list. | ||
omKeyInfoList = omMetadataManager.listKeys(volumeNameA, ozoneBucket, | ||
startKey, prefixKeyA, 10); | ||
|
||
Assert.assertEquals(omKeyInfoList.size(), 0); | ||
|
||
|
||
|
||
} | ||
|
||
private void addKeysToOM(String volumeName, String bucketName, | ||
String keyName, int i) throws Exception { | ||
|
||
if (i%2== 0) { | ||
TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName, keyName, | ||
1000L, HddsProtos.ReplicationType.RATIS, | ||
HddsProtos.ReplicationFactor.ONE, omMetadataManager); | ||
} else { | ||
TestOMRequestUtils.addKeyToTableCache(volumeName, bucketName, keyName, | ||
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE, | ||
omMetadataManager); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How many keys are expected in this cache? and how many in the tree ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that we are better off leaving the old code in place...where we can read from the DB.. Worst, we might have to make sure that cache is flushed to DB before doing the list operation.But practically it may not matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok with putting this change in if we can prove that we can do large list keys. You might want to borrow the DB from @nandakumar131 and see if you can list keys with this patch, just a thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key cache is not full cache, so if double buffer flush is going on well in background, this should have around couple of 100 entries. When I started freon with 10 threads, i see the value of maximum iteration is 200. So, almost in the cache we have 200 entries. (But on tried with busy workload clusters, slow disks)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With current new code when the list happens we should consider entries from buffer and DB. (As we return the response to end-user after adding entries to cache). So, if user does list as next operation(next to create bucket) the bucket might/might not be there until double buffer flushes. As until double buffer flushes, we will have entries in cache. (This will not be problem for non-HA, as we return the response, only after the flush)