Skip to content

Commit ea4e518

Browse files
committed
HDFS-17529. Improve router state store cache update
1 parent 8f92cda commit ea4e518

File tree

10 files changed

+216
-34
lines changed

10 files changed

+216
-34
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
218218
FEDERATION_STORE_PREFIX + "driver.class";
219219
public static final Class<? extends StateStoreDriver>
220220
FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreZooKeeperImpl.class;
221+
public static final String FEDERATION_STORE_MEMBERSHIP_ASYNC_OVERRIDE_CLASSES =
222+
FEDERATION_STORE_PREFIX + "async.override.classes";
221223

222224
public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
223225
FEDERATION_STORE_PREFIX + "connection.test";

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java

Lines changed: 64 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121
import java.util.ArrayList;
2222
import java.util.LinkedList;
2323
import java.util.List;
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.LinkedBlockingQueue;
28+
import java.util.concurrent.ThreadPoolExecutor;
29+
import java.util.concurrent.TimeUnit;
2430
import java.util.concurrent.locks.Lock;
2531
import java.util.concurrent.locks.ReadWriteLock;
2632
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,7 +53,8 @@ public abstract class CachedRecordStore<R extends BaseRecord>
4753

4854
/** Prevent loading the cache more than once every 500 ms. */
4955
private static final long MIN_UPDATE_MS = 500;
50-
56+
/** Should spawn 2 separate threads for overwriting and deleting records or not? */
57+
private boolean asyncOverride = false;
5158

5259
/** Cached entries. */
5360
private List<R> records = new ArrayList<>();
@@ -94,6 +101,10 @@ protected CachedRecordStore(
94101
this.override = over;
95102
}
96103

104+
public void toggleAsyncOverride(boolean flag) {
105+
this.asyncOverride = flag;
106+
}
107+
97108
/**
98109
* Check that the cache of the State Store information is available.
99110
*
@@ -121,7 +132,7 @@ public boolean loadCache(boolean force) throws IOException {
121132

122133
// If we have any expired record, update the State Store
123134
if (this.override) {
124-
overrideExpiredRecords(result);
135+
overrideExpiredRecords(result, this.asyncOverride);
125136
}
126137
} catch (IOException e) {
127138
LOG.error("Cannot get \"{}\" records from the State Store",
@@ -168,11 +179,13 @@ private boolean isUpdateTime() {
168179
* removed.
169180
*
170181
* @param query RecordQueryResult containing the data to be inspected.
182+
* @param async should spawn threads or not, one for overwriting, one for deleting
171183
* @throws IOException If the values cannot be updated.
172184
*/
173-
public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
185+
public void overrideExpiredRecords(QueryResult<R> query, boolean async) throws IOException {
174186
List<R> commitRecords = new ArrayList<>();
175-
List<R> deleteRecords = new ArrayList<>();
187+
List<R> toDeleteRecords = new ArrayList<>();
188+
List<R> deletedRecords = new ArrayList<>();
176189
List<R> newRecords = query.getRecords();
177190
long currentDriverTime = query.getTimestamp();
178191
if (newRecords == null || currentDriverTime <= 0) {
@@ -182,24 +195,58 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
182195
for (R record : newRecords) {
183196
if (record.shouldBeDeleted(currentDriverTime)) {
184197
String recordName = StateStoreUtils.getRecordName(record.getClass());
185-
if (getDriver().remove(record)) {
186-
deleteRecords.add(record);
187-
LOG.info("Deleted State Store record {}: {}", recordName, record);
188-
} else {
189-
LOG.warn("Couldn't delete State Store record {}: {}", recordName,
190-
record);
191-
}
198+
LOG.info("State Store record to delete {}: {}", recordName, record);
199+
toDeleteRecords.add(record);
192200
} else if (!record.isExpired() && record.checkExpired(currentDriverTime)) {
193201
String recordName = StateStoreUtils.getRecordName(record.getClass());
194202
LOG.info("Override State Store record {}: {}", recordName, record);
195203
commitRecords.add(record);
196204
}
197205
}
198-
if (commitRecords.size() > 0) {
199-
getDriver().putAll(commitRecords, true, false);
200-
}
201-
if (deleteRecords.size() > 0) {
202-
newRecords.removeAll(deleteRecords);
206+
207+
List<Callable<Void>> callables = new ArrayList<>();
208+
callables.add(() -> {
209+
if (!commitRecords.isEmpty()) {
210+
getDriver().putAll(commitRecords, true, false);
211+
}
212+
return null;
213+
});
214+
215+
callables.add(() -> {
216+
if (!toDeleteRecords.isEmpty()) {
217+
deletedRecords.addAll(getDriver().removeMultiple(toDeleteRecords));
218+
}
219+
if (!deletedRecords.isEmpty()) {
220+
newRecords.removeAll(deletedRecords);
221+
}
222+
return null;
223+
});
224+
225+
if (async) {
226+
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2,
227+
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
228+
List<Future<Void>> futures = new ArrayList<>();
229+
futures.add(executor.submit(callables.get(0)));
230+
futures.add(executor.submit(callables.get(1)));
231+
try {
232+
for (Future<Void> future : futures) {
233+
future.get();
234+
}
235+
} catch (InterruptedException e) {
236+
LOG.error("Failed to override expired records.", e);
237+
throw new IOException(e);
238+
} catch (ExecutionException e) {
239+
throw new IOException(e);
240+
} finally {
241+
executor.shutdown();
242+
}
243+
} else {
244+
try {
245+
callables.get(0).call();
246+
callables.get(1).call();
247+
} catch (Exception e) {
248+
throw new IOException(e);
249+
}
203250
}
204251
}
205252

@@ -215,7 +262,7 @@ public void overrideExpiredRecord(R record) throws IOException {
215262
newRecords.add(record);
216263
long time = getDriver().getTime();
217264
QueryResult<R> query = new QueryResult<>(newRecords, time);
218-
overrideExpiredRecords(query);
265+
overrideExpiredRecords(query, false);
219266
}
220267

221268
/**

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,11 @@ private <T extends RecordStore<?>> void addRecordStore(
251251

252252
T recordStore = RecordStore.newInstance(clazz, this.getDriver());
253253
Class<? extends BaseRecord> recordClass = recordStore.getRecordClass();
254+
if (recordStore instanceof CachedRecordStore && conf.getStringCollection(
255+
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_ASYNC_OVERRIDE_CLASSES)
256+
.contains(clazz.getCanonicalName())) {
257+
((CachedRecordStore<?>) recordStore).toggleAsyncOverride(true);
258+
}
254259
this.recordStores.put(recordClass, recordStore);
255260

256261
// Subscribe for cache updates

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,17 @@ <T extends BaseRecord> StateStoreOperationResult putAll(
127127
@AtMostOnce
128128
<T extends BaseRecord> boolean remove(T record) throws IOException;
129129

130+
/**
131+
* Remove multiple records.
132+
*
133+
* @param <T> Record class of the records.
134+
* @param records Records to be removed.
135+
* @return Records that were removed.
136+
* @throws IOException Throws exception if unable to query the data store.
137+
*/
138+
@AtMostOnce
139+
<T extends BaseRecord> List<T> removeMultiple(List<T> records) throws IOException;
140+
130141
/**
131142
* Remove all records of this class from the store.
132143
*
@@ -152,4 +163,17 @@ <T extends BaseRecord> StateStoreOperationResult putAll(
152163
<T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
153164
throws IOException;
154165

166+
/**
167+
* Remove all records of a specific class that match any query in a list of queries.
168+
* Requires the getAll implementation to fetch fresh records on each call.
169+
*
170+
* @param clazz The class to match the records with.
171+
* @param queries Queries (logical OR) to filter what to remove.
172+
* @param <T> Record class of the records.
173+
* @return Removed records. Not true removed records, but rather queries that deleted records.
174+
* @throws IOException Throws exception if unable to query the data store.
175+
*/
176+
@AtMostOnce
177+
<T extends BaseRecord> List<T> remove(Class<T> clazz, List<Query<T>> queries)
178+
throws IOException;
155179
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,39 @@ public <T extends BaseRecord> boolean remove(T record) throws IOException {
8686
Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
8787
return remove(recordClass, query) == 1;
8888
}
89+
90+
@Override
91+
public <T extends BaseRecord> List<T> removeMultiple(List<T> records) throws IOException {
92+
assert !records.isEmpty();
93+
// Fall back to iterative remove() calls if all records don't share 1 class
94+
Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
95+
if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
96+
List<T> result = new ArrayList<>();
97+
for (T record : records) {
98+
if (remove(record)) {
99+
result.add(record);
100+
}
101+
}
102+
return result;
103+
}
104+
105+
final List<Query<T>> queries = new ArrayList<>();
106+
for (T record: records) {
107+
queries.add(new Query<>(record));
108+
}
109+
@SuppressWarnings("unchecked")
110+
Class<T> recordClass = (Class<T>) StateStoreUtils.getRecordClass(expectedClazz);
111+
return remove(recordClass, queries);
112+
}
113+
114+
public <T extends BaseRecord> List<T> remove(Class<T> clazz, List<Query<T>> queries)
115+
throws IOException {
116+
List<T> result = new ArrayList<>();
117+
for (Query<T> query : queries) {
118+
if (remove(clazz, query) > 0) {
119+
result.add(query.getPartial());
120+
}
121+
}
122+
return result;
123+
}
89124
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
import java.io.IOException;
2626
import java.util.ArrayList;
2727
import java.util.Collections;
28+
import java.util.HashMap;
29+
import java.util.HashSet;
2830
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
2933
import java.util.concurrent.Callable;
3034
import java.util.concurrent.Future;
3135
import java.util.concurrent.LinkedBlockingQueue;
@@ -284,53 +288,89 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
284288
}
285289

286290
@Override
287-
public <T extends BaseRecord> int remove(
288-
Class<T> clazz, Query<T> query) throws IOException {
291+
public <T extends BaseRecord> List<T> remove(Class<T> clazz, List<Query<T>> queries)
292+
throws IOException {
289293
verifyDriverReady();
290-
if (query == null) {
291-
return 0;
294+
final List<T> removed = new ArrayList<>();
295+
final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>());
296+
if (queries.isEmpty()) {
297+
return removed;
292298
}
293299

294300
// Read the current data
295301
long start = monotonicNow();
296-
List<T> records = null;
302+
List<T> records;
297303
try {
298304
QueryResult<T> result = get(clazz);
299305
records = result.getRecords();
300306
} catch (IOException ex) {
301307
LOG.error("Cannot get existing records", ex);
302308
getMetrics().addFailure(monotonicNow() - start);
303-
return 0;
309+
return removed;
304310
}
305311

306312
// Check the records to remove
307313
String znode = getZNodeForClass(clazz);
308-
List<T> recordsToRemove = filterMultiple(query, records);
314+
Set<T> recordsToRemove = new HashSet<>();
315+
Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
316+
for (Query<T> query : queries) {
317+
List<T> filtered = filterMultiple(query, records);
318+
queryToRecords.put(query, filtered);
319+
recordsToRemove.addAll(filtered);
320+
}
309321

310322
// Remove the records
311-
int removed = 0;
312-
for (T existingRecord : recordsToRemove) {
323+
List<Callable<Void>> callables = new ArrayList<>();
324+
recordsToRemove.forEach(existingRecord -> callables.add(() -> {
313325
LOG.info("Removing \"{}\"", existingRecord);
314326
try {
315327
String primaryKey = getPrimaryKey(existingRecord);
316328
String path = getNodePath(znode, primaryKey);
317329
if (zkManager.delete(path)) {
318-
removed++;
330+
trueRemoved.add(existingRecord);
319331
} else {
320332
LOG.error("Did not remove \"{}\"", existingRecord);
321333
}
322334
} catch (Exception e) {
323335
LOG.error("Cannot remove \"{}\"", existingRecord, e);
324336
getMetrics().addFailure(monotonicNow() - start);
325337
}
338+
return null;
339+
}));
340+
try {
341+
if (enableConcurrent) {
342+
executorService.invokeAll(callables);
343+
} else {
344+
for (Callable<Void> callable : callables) {
345+
callable.call();
346+
}
347+
}
348+
} catch (Exception e) {
349+
LOG.error("Record removal failed : {}", e.getMessage(), e);
350+
throw new IOException(e);
326351
}
327352
long end = monotonicNow();
328-
if (removed > 0) {
353+
if (!trueRemoved.isEmpty()) {
329354
getMetrics().addRemove(end - start);
330355
}
356+
// Convert back to partials
357+
for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) {
358+
for (T record : entry.getValue()) {
359+
if (trueRemoved.contains(record)) {
360+
removed.add(entry.getKey().getPartial());
361+
break;
362+
}
363+
}
364+
}
331365
return removed;
332366
}
333367

368+
@Override
369+
public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
370+
throws IOException {
371+
return remove(clazz, Collections.singletonList(query)).size();
372+
}
373+
334374
@Override
335375
public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
336376
throws IOException {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,20 @@
370370
</description>
371371
</property>
372372

373+
<property>
374+
<name>dfs.federation.router.store.async.override.classes</name>
375+
<value></value>
376+
<description>
377+
CachedRecordStore periodic updates contain an overwrite part and a deletion part.
378+
Classes in this config will do these 2 steps in parallel when updating cache.
379+
Supported classes:
380+
org.apache.hadoop.hdfs.server.federation.store.impl.DisabledNameserviceStoreImpl
381+
org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl
382+
org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl
383+
org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl
384+
</description>
385+
</property>
386+
373387
<property>
374388
<name>dfs.federation.router.store.connection.test</name>
375389
<value>60000</value>

hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ The connection to the State Store and the internal caching at the Router.
469469
| dfs.federation.router.store.connection.test | 60000 | How often to check for the connection to the State Store in milliseconds. |
470470
| dfs.federation.router.cache.ttl | 60000 | How often to refresh the State Store caches in milliseconds. |
471471
| dfs.federation.router.store.membership.expiration | 300000 | Expiration time in milliseconds for a membership record. |
472+
| dfs.federation.router.store.async.override.classes | | These cache State Stores, when updating cache, will overwrite and delete records in parallel. |
472473
| dfs.federation.router.mount-table.cache.update | false | If true, Mount table cache is updated whenever a mount table entry is added, modified or removed for all the routers. |
473474
| dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to wait for all the routers to finish their mount table cache update. |
474475
| dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max time a RouterClient connection can be cached. |

0 commit comments

Comments
 (0)