Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -172,7 +173,8 @@ private boolean isUpdateTime() {
*/
public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
List<R> commitRecords = new ArrayList<>();
List<R> deleteRecords = new ArrayList<>();
List<R> toDeleteRecords = new ArrayList<>();
List<R> deletedRecords = new ArrayList<>();
List<R> newRecords = query.getRecords();
long currentDriverTime = query.getTimestamp();
if (newRecords == null || currentDriverTime <= 0) {
Expand All @@ -182,13 +184,8 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
for (R record : newRecords) {
if (record.shouldBeDeleted(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
if (getDriver().remove(record)) {
deleteRecords.add(record);
LOG.info("Deleted State Store record {}: {}", recordName, record);
} else {
LOG.warn("Couldn't delete State Store record {}: {}", recordName,
record);
}
LOG.info("State Store record to delete {}: {}", recordName, record);
toDeleteRecords.add(record);
} else if (!record.isExpired() && record.checkExpired(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
LOG.info("Override State Store record {}: {}", recordName, record);
Expand All @@ -198,8 +195,15 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
if (commitRecords.size() > 0) {
getDriver().putAll(commitRecords, true, false);
}
if (deleteRecords.size() > 0) {
newRecords.removeAll(deleteRecords);
if (!toDeleteRecords.isEmpty()) {
for (Map.Entry<R, Boolean> entry : getDriver().removeMultiple(toDeleteRecords).entrySet()) {
if (entry.getValue()) {
deletedRecords.add(entry.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here changing to newRecords.remove(entry.getKey()), we can remove deletedRecords.

}
}
}
if (!deletedRecords.isEmpty()) {
newRecords.removeAll(deletedRecords);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -127,6 +128,17 @@ <T extends BaseRecord> StateStoreOperationResult putAll(
@AtMostOnce
<T extends BaseRecord> boolean remove(T record) throws IOException;

/**
* Remove multiple records.
*
* @param <T> Record class of the records.
* @param records Records to be removed.
* @return Map of record -> boolean indicating any entries being deleted by this record.
* @throws IOException Throws exception if unable to query the data store.
*/
@AtMostOnce
<T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException;

/**
* Remove all records of this class from the store.
*
Expand All @@ -152,4 +164,17 @@ <T extends BaseRecord> StateStoreOperationResult putAll(
<T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
throws IOException;

/**
* Remove all records of a specific class that match any query in a list of queries.
* Requires the getAll implementation to fetch fresh records on each call.
*
* @param clazz The class to match the records with.
* @param queries Queries (logical OR) to filter what to remove.
* @param <T> Record class of the records.
* @return Map of query to number of records deleted by that query.
* @throws IOException Throws exception if unable to query the data store.
*/
@AtMostOnce
<T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, List<Query<T>> queries)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) throws IOException {
Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
return remove(recordClass, query) == 1;
}

@Override
public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException {
assert !records.isEmpty();
// Fall back to iterative remove() calls if all records don't share 1 class
Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
Map<T, Boolean> result = new HashMap<>();
for (T record : records) {
result.put(record, remove(record));
}
return result;
}

final List<Query<T>> queries = new ArrayList<>();
for (T record: records) {
queries.add(new Query<>(record));
}
@SuppressWarnings("unchecked")
Class<T> recordClass = (Class<T>) StateStoreUtils.getRecordClass(expectedClazz);
Map<Query<T>, Integer> result = remove(recordClass, queries);
return result.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().getPartial(), e -> e.getValue() > 0));
}

public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
List<Query<T>> queries) throws IOException {
Map<Query<T>, Integer> result = new HashMap<>();
for (Query<T> query : queries) {
result.put(query, remove(clazz, query));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
}

@Override
public <T extends BaseRecord> int remove(
Class<T> clazz, Query<T> query) throws IOException {
public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
List<Query<T>> queries) throws IOException {
verifyDriverReady();
if (query == null) {
return 0;
// Track how many entries are deleted by each query
Map<Query<T>, Integer> ret = new HashMap<>();
final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>());
if (queries.isEmpty()) {
return ret;
}

// Read the current data
long start = monotonicNow();
List<T> records = null;
List<T> records;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
getMetrics().addFailure(monotonicNow() - start);
return 0;
return ret;
}

// Check the records to remove
String znode = getZNodeForClass(clazz);
List<T> recordsToRemove = filterMultiple(query, records);
Set<T> recordsToRemove = new HashSet<>();
Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
for (Query<T> query : queries) {
List<T> filtered = filterMultiple(query, records);
queryToRecords.put(query, filtered);
recordsToRemove.addAll(filtered);
}

// Remove the records
int removed = 0;
for (T existingRecord : recordsToRemove) {
List<Callable<Void>> callables = new ArrayList<>();
recordsToRemove.forEach(existingRecord -> callables.add(() -> {
LOG.info("Removing \"{}\"", existingRecord);
try {
String primaryKey = getPrimaryKey(existingRecord);
String path = getNodePath(znode, primaryKey);
if (zkManager.delete(path)) {
removed++;
trueRemoved.add(existingRecord);
} else {
LOG.error("Did not remove \"{}\"", existingRecord);
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
getMetrics().addFailure(monotonicNow() - start);
}
return null;
}));
try {
if (enableConcurrent) {
executorService.invokeAll(callables);
} else {
for (Callable<Void> callable : callables) {
callable.call();
}
}
} catch (Exception e) {
LOG.error("Record removal failed : {}", e.getMessage(), e);
throw new IOException(e);
}
long end = monotonicNow();
if (removed > 0) {
if (!trueRemoved.isEmpty()) {
getMetrics().addRemove(end - start);
}
return removed;
// Generate return map
for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) {
for (T record : entry.getValue()) {
if (trueRemoved.contains(record)) {
ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1);
break;
}
}
}
return ret;
}

@Override
public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
throws IOException {
return remove(clazz, Collections.singletonList(query)).get(query);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

Expand Down Expand Up @@ -84,6 +85,8 @@ public static void create() {
getConf().setLong(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS,
TimeUnit.SECONDS.toMillis(2));
getConf().set(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_ASYNC_OVERRIDE_CLASSES,
"org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl");
}

@Before
Expand Down Expand Up @@ -565,7 +568,7 @@ public void testRegistrationExpiredRaceCondition()
// Load cache
MembershipStore memStoreSpy = spy(membershipStore);
DelayAnswer delayer = new DelayAnswer(LOG);
doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any());
doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any(), anyBoolean());

ExecutorService pool = Executors.newFixedThreadPool(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,28 @@ public void testAsyncPerformance() throws Exception {
insertList.add(newRecord);
}
// Insert Multiple on sync mode
long startSync = Time.now();
long startSyncPut = Time.now();
stateStoreDriver.putAll(insertList, true, false);
long endSync = Time.now();
long endSyncPut = Time.now();
// Removing 1000 records synchronously is painfully slow so test with only 5 records
// Then remove the rest with removeAll()
long startSyncRemove = Time.now();
for (MountTable entry : insertList.subList(0, 5)) {
stateStoreDriver.remove(entry);
}
long endSyncRemove = Time.now();
stateStoreDriver.removeAll(MembershipState.class);

stateStoreDriver.setEnableConcurrent(true);
// Insert Multiple on async mode
long startAsync = Time.now();
long startAsyncPut = Time.now();
stateStoreDriver.putAll(insertList, true, false);
long endAsync = Time.now();
assertTrue((endSync - startSync) > (endAsync - startAsync));
long endAsyncPut = Time.now();
long startAsyncRemove = Time.now();
stateStoreDriver.removeMultiple(insertList.subList(0, 5));
long endAsyncRemove = Time.now();
assertTrue((endSyncPut - startSyncPut) > (endAsyncPut - startAsyncPut));
assertTrue((endSyncRemove - startSyncRemove) > (endAsyncRemove - startAsyncRemove));
}

@Test
Expand Down