Skip to content

Commit dadfceb

Browse files
authored
Merge branch 'apache:trunk' into YARN-11350-V2
2 parents d7b5b3d + 4af4997 commit dadfceb

File tree

154 files changed

+2762
-671
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

154 files changed

+2762
-671
lines changed

LICENSE-binary

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ com.nimbusds:nimbus-jose-jwt:9.8.1
244244
com.squareup.okhttp3:okhttp:4.10.0
245245
com.squareup.okio:okio:3.2.0
246246
com.zaxxer:HikariCP:4.0.3
247-
commons-beanutils:commons-beanutils:1.9.3
247+
commons-beanutils:commons-beanutils:1.9.4
248248
commons-cli:commons-cli:1.2
249249
commons-codec:commons-codec:1.11
250250
commons-collections:commons-collections:3.2.2

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,16 @@ public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset,
307307
FileRange request) {
308308
int offsetChange = (int) (request.getOffset() - readOffset);
309309
int requestLength = request.getLength();
310+
// Create a new buffer that is backed by the original contents
311+
// The buffer will have position 0 and the same limit as the original one
310312
readData = readData.slice();
313+
// Change the offset and the limit of the buffer as the reader wants to see
314+
// only relevant data
311315
readData.position(offsetChange);
312316
readData.limit(offsetChange + requestLength);
317+
// Create a new buffer after the limit change so that only that portion of the data is
318+
// returned to the reader.
319+
readData = readData.slice();
313320
return readData;
314321
}
315322

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public void testSliceTo() {
6161
.describedAs("Slicing on the same offset shouldn't " +
6262
"create a new buffer")
6363
.isEqualTo(slice);
64+
Assertions.assertThat(slice.position())
65+
.describedAs("Slicing should return buffers starting from position 0")
66+
.isEqualTo(0);
6467

6568
// try slicing a range
6669
final int offset = 100;
@@ -77,6 +80,9 @@ public void testSliceTo() {
7780
.describedAs("Slicing should use the same underlying " +
7881
"data")
7982
.isEqualTo(slice.array());
83+
Assertions.assertThat(slice.position())
84+
.describedAs("Slicing should return buffers starting from position 0")
85+
.isEqualTo(0);
8086
// test the contents of the slice
8187
intBuffer = slice.asIntBuffer();
8288
for(int i=0; i < sliceLength / Integer.BYTES; ++i) {

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,12 @@ public static ClientProtocol createProxyWithAlignmentContext(
349349
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
350350
AlignmentContext alignmentContext)
351351
throws IOException {
352+
if (alignmentContext == null &&
353+
conf.getBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE,
354+
HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE_DEFAULT)) {
355+
alignmentContext = new ClientGSIContext();
356+
}
357+
352358
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
353359
ProtobufRpcEngine2.class);
354360

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public interface HdfsClientConfigKeys {
7878
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;
7979
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
8080
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
81+
String DFS_RBF_OBSERVER_READ_ENABLE = "dfs.client.rbf.observer.read.enable";
82+
boolean DFS_RBF_OBSERVER_READ_ENABLE_DEFAULT = false;
8183
int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020;
8284
String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
8385
"dfs.namenode.kerberos.principal";

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,13 +226,14 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
226226
this.pools.put(connectionId, pool);
227227
this.connectionPoolToNamespaceMap.put(connectionId, nsId);
228228
}
229-
long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
230-
pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
231229
} finally {
232230
writeLock.unlock();
233231
}
234232
}
235233

234+
long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
235+
pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
236+
236237
ConnectionContext conn = pool.getConnection();
237238

238239
// Add a new connection to the pool if it wasn't usable

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import java.io.IOException;
2222
import java.util.concurrent.atomic.LongAccumulator;
23+
24+
import org.apache.hadoop.classification.VisibleForTesting;
2325
import org.apache.hadoop.ipc.AlignmentContext;
2426
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
2527

@@ -99,4 +101,9 @@ public boolean isCoordinatedCall(String protocolName, String method) {
99101
public void advanceClientStateId(Long clientStateId) {
100102
poolLocalStateId.accumulate(clientStateId);
101103
}
104+
105+
@VisibleForTesting
106+
public long getPoolLocalStateId() {
107+
return this.poolLocalStateId.get();
108+
}
102109
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,15 @@ public <T extends RecordStore<?>> T getRegisteredRecordStore(
272272
return null;
273273
}
274274

275+
/**
276+
* Get the list of all RecordStores.
277+
* @return a list of each RecordStore.
278+
*/
279+
@SuppressWarnings("unchecked")
280+
public <T extends RecordStore<? extends BaseRecord>> List<T> getRecordStores() {
281+
return new ArrayList<>((Collection<T>) recordStores.values());
282+
}
283+
275284
/**
276285
* List of records supported by this State Store.
277286
*

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ protected abstract <T extends BaseRecord> BufferedReader getReader(
8585
* @param path Path of the record to write.
8686
* @return Writer for the record.
8787
*/
88-
protected abstract <T extends BaseRecord> BufferedWriter getWriter(
88+
@VisibleForTesting
89+
public abstract <T extends BaseRecord> BufferedWriter getWriter(
8990
String path);
9091

9192
/**
@@ -348,25 +349,18 @@ public <T extends BaseRecord> boolean putAll(
348349
for (Entry<String, T> entry : toWrite.entrySet()) {
349350
String recordPath = entry.getKey();
350351
String recordPathTemp = recordPath + "." + now() + TMP_MARK;
351-
BufferedWriter writer = getWriter(recordPathTemp);
352-
try {
352+
boolean recordWrittenSuccessfully = true;
353+
try (BufferedWriter writer = getWriter(recordPathTemp)) {
353354
T record = entry.getValue();
354355
String line = serializeString(record);
355356
writer.write(line);
356357
} catch (IOException e) {
357358
LOG.error("Cannot write {}", recordPathTemp, e);
359+
recordWrittenSuccessfully = false;
358360
success = false;
359-
} finally {
360-
if (writer != null) {
361-
try {
362-
writer.close();
363-
} catch (IOException e) {
364-
LOG.error("Cannot close the writer for {}", recordPathTemp, e);
365-
}
366-
}
367361
}
368362
// Commit
369-
if (!rename(recordPathTemp, recordPath)) {
363+
if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
370364
LOG.error("Failed committing record into {}", recordPath);
371365
success = false;
372366
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232

3333
import org.apache.commons.lang3.ArrayUtils;
34+
import org.apache.hadoop.classification.VisibleForTesting;
3435
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
3536
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
3637
import org.slf4j.Logger;
@@ -125,7 +126,8 @@ protected <T extends BaseRecord> BufferedReader getReader(String filename) {
125126
}
126127

127128
@Override
128-
protected <T extends BaseRecord> BufferedWriter getWriter(String filename) {
129+
@VisibleForTesting
130+
public <T extends BaseRecord> BufferedWriter getWriter(String filename) {
129131
BufferedWriter writer = null;
130132
try {
131133
LOG.debug("Writing file: {}", filename);

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@
2828
import java.util.Collections;
2929
import java.util.List;
3030

31+
import org.apache.hadoop.classification.VisibleForTesting;
3132
import org.apache.hadoop.fs.FSDataInputStream;
3233
import org.apache.hadoop.fs.FSDataOutputStream;
3334
import org.apache.hadoop.fs.FileStatus;
3435
import org.apache.hadoop.fs.FileSystem;
36+
import org.apache.hadoop.fs.FileUtil;
3537
import org.apache.hadoop.fs.Options;
3638
import org.apache.hadoop.fs.Path;
37-
import org.apache.hadoop.hdfs.DistributedFileSystem;
3839
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
3940
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
4041
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@@ -82,17 +83,8 @@ protected boolean mkdir(String path) {
8283
@Override
8384
protected boolean rename(String src, String dst) {
8485
try {
85-
if (fs instanceof DistributedFileSystem) {
86-
DistributedFileSystem dfs = (DistributedFileSystem)fs;
87-
dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
88-
return true;
89-
} else {
90-
// Replace should be atomic but not available
91-
if (fs.exists(new Path(dst))) {
92-
fs.delete(new Path(dst), true);
93-
}
94-
return fs.rename(new Path(src), new Path(dst));
95-
}
86+
FileUtil.rename(fs, new Path(src), new Path(dst), Options.Rename.OVERWRITE);
87+
return true;
9688
} catch (Exception e) {
9789
LOG.error("Cannot rename {} to {}", src, dst, e);
9890
return false;
@@ -148,7 +140,8 @@ protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
148140
}
149141

150142
@Override
151-
protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
143+
@VisibleForTesting
144+
public <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
152145
BufferedWriter writer = null;
153146
Path path = new Path(pathName);
154147
try {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdfs.tools.federation;
1919

2020
import java.io.IOException;
21+
import java.io.PrintStream;
2122
import java.net.InetSocketAddress;
2223
import java.util.Arrays;
2324
import java.util.Collection;
@@ -26,6 +27,7 @@
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Map.Entry;
30+
import java.util.TreeMap;
2931
import java.util.regex.Pattern;
3032

3133
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,6 +48,10 @@
4648
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
4749
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
4850
import org.apache.hadoop.hdfs.server.federation.router.RouterStateManager;
51+
import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore;
52+
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
53+
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
54+
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
4955
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
5056
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
5157
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
@@ -70,7 +76,9 @@
7076
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
7177
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
7278
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
79+
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
7380
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
81+
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
7482
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
7583
import org.apache.hadoop.ipc.RPC;
7684
import org.apache.hadoop.ipc.RefreshResponse;
@@ -97,6 +105,7 @@
97105
public class RouterAdmin extends Configured implements Tool {
98106

99107
private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class);
108+
private static final String DUMP_COMMAND = "-dumpState";
100109

101110
private RouterClient client;
102111

@@ -133,7 +142,7 @@ private String getUsage(String cmd) {
133142
String[] commands =
134143
{"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota",
135144
"-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota",
136-
"-safemode", "-nameservice", "-getDisabledNameservices",
145+
DUMP_COMMAND, "-safemode", "-nameservice", "-getDisabledNameservices",
137146
"-refresh", "-refreshRouterArgs",
138147
"-refreshSuperUserGroupsConfiguration", "-refreshCallQueue"};
139148
StringBuilder usage = new StringBuilder();
@@ -187,6 +196,8 @@ private String getUsage(String cmd) {
187196
return "\t[-refreshSuperUserGroupsConfiguration]";
188197
} else if (cmd.equals("-refreshCallQueue")) {
189198
return "\t[-refreshCallQueue]";
199+
} else if (cmd.equals(DUMP_COMMAND)) {
200+
return "\t[" + DUMP_COMMAND + "]";
190201
}
191202
return getUsage(null);
192203
}
@@ -224,7 +235,8 @@ private void validateMax(String[] arg) {
224235
if (arg.length > 1) {
225236
throw new IllegalArgumentException("No arguments allowed");
226237
}
227-
} else if (arg[0].equals("-refreshCallQueue")) {
238+
} else if (arg[0].equals("-refreshCallQueue") ||
239+
arg[0].equals(DUMP_COMMAND)) {
228240
if (arg.length > 1) {
229241
throw new IllegalArgumentException("No arguments allowed");
230242
}
@@ -286,6 +298,15 @@ private boolean validateMin(String[] argv) {
286298
return true;
287299
}
288300

301+
/**
302+
* Does this command run in the local process?
303+
* @param cmd the string of the command
304+
* @return is this a local command?
305+
*/
306+
boolean isLocalCommand(String cmd) {
307+
return DUMP_COMMAND.equals(cmd);
308+
}
309+
289310
@Override
290311
public int run(String[] argv) throws Exception {
291312
if (argv.length < 1) {
@@ -303,6 +324,10 @@ public int run(String[] argv) throws Exception {
303324
System.err.println("Not enough parameters specificed for cmd " + cmd);
304325
printUsage(cmd);
305326
return exitCode;
327+
} else if (isLocalCommand(argv[0])) {
328+
if (DUMP_COMMAND.equals(argv[0])) {
329+
return dumpStateStore(getConf(), System.out) ? 0 : -1;
330+
}
306331
}
307332
String address = null;
308333
// Initialize RouterClient
@@ -1301,6 +1326,49 @@ private int refreshCallQueue() throws IOException {
13011326
return returnCode;
13021327
}
13031328

1329+
/**
1330+
* Dumps the contents of the StateStore to stdout.
1331+
* @return true if it was successful
1332+
*/
1333+
public static boolean dumpStateStore(Configuration conf,
1334+
PrintStream output) throws IOException {
1335+
StateStoreService service = new StateStoreService();
1336+
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
1337+
service.init(conf);
1338+
service.loadDriver();
1339+
if (!service.isDriverReady()) {
1340+
System.err.println("Can't initialize driver");
1341+
return false;
1342+
}
1343+
// Get the stores sorted by name
1344+
Map<String, RecordStore<? extends BaseRecord>> stores = new TreeMap<>();
1345+
for(RecordStore<? extends BaseRecord> store: service.getRecordStores()) {
1346+
String recordName = StateStoreUtils.getRecordName(store.getRecordClass());
1347+
stores.put(recordName, store);
1348+
}
1349+
for (Entry<String, RecordStore<? extends BaseRecord>> pair: stores.entrySet()) {
1350+
String recordName = pair.getKey();
1351+
RecordStore<? extends BaseRecord> store = pair.getValue();
1352+
output.println("---- " + recordName + " ----");
1353+
if (store instanceof CachedRecordStore) {
1354+
for (Object record: ((CachedRecordStore) store).getCachedRecords()) {
1355+
if (record instanceof BaseRecord && record instanceof PBRecord) {
1356+
BaseRecord baseRecord = (BaseRecord) record;
1357+
// Generate the pseudo-json format of the protobuf record
1358+
String recordString = ((PBRecord) record).getProto().toString();
1359+
// Indent each line
1360+
recordString = " " + recordString.replaceAll("\n", "\n ");
1361+
output.println(String.format(" %s:", baseRecord.getPrimaryKey()));
1362+
output.println(recordString);
1363+
}
1364+
}
1365+
output.println();
1366+
}
1367+
}
1368+
service.stop();
1369+
return true;
1370+
}
1371+
13041372
/**
13051373
* Normalize a path for that filesystem.
13061374
*
@@ -1341,4 +1409,4 @@ public FsPermission getMode() {
13411409
return mode;
13421410
}
13431411
}
1344-
}
1412+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,17 @@ To trigger a runtime-refresh of the resource specified by \<key\> on \<host:ipc\
328328

329329
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refreshRouterArgs <host:ipc_port> <key> [arg1..argn]
330330

331+
### Router state dump
332+
333+
To diagnose the current state of the routers, you can use the
334+
dumpState command. It generates a text dump of the records in the
335+
State Store. Since it uses the configuration to find and read the
336+
state store, it is often easiest to use the machine where the routers
337+
run. The command runs locally, so the routers do not have to be up to
338+
use this command.
339+
340+
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -dumpState
341+
331342
Client configuration
332343
--------------------
333344

0 commit comments

Comments
 (0)