Skip to content

Commit 44b3a50

Browse files
authored
Merge branch 'apache:trunk' into YARN-11374
2 parents 3eb9f32 + 9668a85 commit 44b3a50

File tree

33 files changed

+1581
-243
lines changed

33 files changed

+1581
-243
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,53 @@ private void updateNameNodeState(final String nsId,
193193
}
194194
}
195195

196+
/**
197+
* Try to shuffle the multiple observer namenodes if listObserversFirst is true.
198+
* @param inputNameNodes the input FederationNamenodeContext list. If listObserversFirst is true,
199+
* all observers will be placed at the front of the collection.
200+
* @param listObserversFirst true if we need to shuffle the multiple front observer namenodes.
201+
* @return a list of FederationNamenodeContext.
202+
* @param <T> a subclass of FederationNamenodeContext.
203+
*/
204+
private <T extends FederationNamenodeContext> List<T> shuffleObserverNN(
205+
List<T> inputNameNodes, boolean listObserversFirst) {
206+
if (!listObserversFirst) {
207+
return inputNameNodes;
208+
}
209+
// Get Observers first.
210+
List<T> observerList = new ArrayList<>();
211+
for (T t : inputNameNodes) {
212+
if (t.getState() == OBSERVER) {
213+
observerList.add(t);
214+
} else {
215+
// The inputNameNodes are already sorted, so it can break
216+
// when the first non-observer is encountered.
217+
break;
218+
}
219+
}
220+
// Returns the inputNameNodes if no shuffle is required
221+
if (observerList.size() <= 1) {
222+
return inputNameNodes;
223+
}
224+
225+
// Shuffle multiple Observers
226+
Collections.shuffle(observerList);
227+
228+
List<T> ret = new ArrayList<>(inputNameNodes.size());
229+
ret.addAll(observerList);
230+
for (int i = observerList.size(); i < inputNameNodes.size(); i++) {
231+
ret.add(inputNameNodes.get(i));
232+
}
233+
return Collections.unmodifiableList(ret);
234+
}
235+
196236
@Override
197237
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
198238
final String nsId, boolean listObserversFirst) throws IOException {
199239

200240
List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
201241
if (ret != null) {
202-
return ret;
242+
return shuffleObserverNN(ret, listObserversFirst);
203243
}
204244

205245
// Not cached, generate the value

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,8 +702,9 @@ public boolean truncate(String src, long newLength, String clientName)
702702
RemoteMethod method = new RemoteMethod("truncate",
703703
new Class<?>[] {String.class, long.class, String.class},
704704
new RemoteParam(), newLength, clientName);
705+
// Truncate can return true/false, so don't expect a result
705706
return rpcClient.invokeSequential(locations, method, Boolean.class,
706-
Boolean.TRUE);
707+
null);
707708
}
708709

709710
@Override

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,98 @@ public void setup() throws IOException, InterruptedException {
9090
assertTrue(cleared);
9191
}
9292

93+
@Test
94+
public void testShuffleObserverNNs() throws Exception {
95+
// Add an active entry to the store
96+
NamenodeStatusReport activeReport = createNamenodeReport(
97+
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
98+
assertTrue(namenodeResolver.registerNamenode(activeReport));
99+
100+
// Add a standby entry to the store
101+
NamenodeStatusReport standbyReport = createNamenodeReport(
102+
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY);
103+
assertTrue(namenodeResolver.registerNamenode(standbyReport));
104+
105+
// Load cache
106+
stateStore.refreshCaches(true);
107+
108+
// Get namenodes from state store.
109+
List<? extends FederationNamenodeContext> withoutObserver =
110+
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
111+
assertEquals(2, withoutObserver.size());
112+
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
113+
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());
114+
115+
// Get namenodes from cache.
116+
withoutObserver = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
117+
assertEquals(2, withoutObserver.size());
118+
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
119+
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());
120+
121+
// Add an observer entry to the store
122+
NamenodeStatusReport observerReport1 = createNamenodeReport(
123+
NAMESERVICES[0], NAMENODES[2], HAServiceState.OBSERVER);
124+
assertTrue(namenodeResolver.registerNamenode(observerReport1));
125+
126+
// Load cache
127+
stateStore.refreshCaches(true);
128+
129+
// Get namenodes from state store.
130+
List<? extends FederationNamenodeContext> observerList =
131+
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
132+
assertEquals(3, observerList.size());
133+
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
134+
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
135+
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());
136+
137+
// Get namenodes from cache.
138+
observerList = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
139+
assertEquals(3, observerList.size());
140+
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
141+
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
142+
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());
143+
144+
// Add one new observer entry to the store
145+
NamenodeStatusReport observerReport2 = createNamenodeReport(
146+
NAMESERVICES[0], NAMENODES[3], HAServiceState.OBSERVER);
147+
assertTrue(namenodeResolver.registerNamenode(observerReport2));
148+
149+
// Load cache
150+
stateStore.refreshCaches(true);
151+
152+
// Get namenodes from state store.
153+
List<? extends FederationNamenodeContext> observerList2 =
154+
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
155+
assertEquals(4, observerList2.size());
156+
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
157+
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
158+
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
159+
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
160+
161+
// Get namenodes from cache.
162+
observerList2 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
163+
assertEquals(4, observerList2.size());
164+
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
165+
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
166+
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
167+
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
168+
169+
// Test shuffler
170+
List<? extends FederationNamenodeContext> observerList3;
171+
boolean hit = false;
172+
for (int i = 0; i < 1000; i++) {
173+
observerList3 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
174+
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(0).getState());
175+
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(1).getState());
176+
if (observerList3.get(0).getNamenodeId().equals(observerList2.get(1).getNamenodeId()) &&
177+
observerList3.get(1).getNamenodeId().equals(observerList2.get(0).getNamenodeId())) {
178+
hit = true;
179+
break;
180+
}
181+
}
182+
assertTrue(hit);
183+
}
184+
93185
@Test
94186
public void testStateStoreDisconnected() throws Exception {
95187

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hadoop.fs.FileStatus;
3434
import org.apache.hadoop.fs.FileSystem;
3535
import org.apache.hadoop.fs.Path;
36+
import org.apache.hadoop.hdfs.DistributedFileSystem;
3637
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
3738
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
3839
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
@@ -46,6 +47,7 @@
4647
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
4748
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
4849
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
50+
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
4951
import org.junit.After;
5052
import org.junit.Before;
5153
import org.junit.Test;
@@ -191,6 +193,18 @@ private void testAll(final String path) throws Exception {
191193
assertDirsEverywhere(path, 9);
192194
assertFilesDistributed(path, 15);
193195

196+
// Test truncate
197+
String testTruncateFile = path + "/dir2/dir22/dir220/file-truncate.txt";
198+
createTestFile(routerFs, testTruncateFile);
199+
Path testTruncateFilePath = new Path(testTruncateFile);
200+
routerFs.truncate(testTruncateFilePath, 10);
201+
TestFileTruncate.checkBlockRecovery(testTruncateFilePath,
202+
(DistributedFileSystem) routerFs);
203+
assertEquals("Truncate file fails", 10,
204+
routerFs.getFileStatus(testTruncateFilePath).getLen());
205+
assertDirsEverywhere(path, 9);
206+
assertFilesDistributed(path, 16);
207+
194208
// Removing a directory should remove it from every subcluster
195209
routerFs.delete(new Path(path + "/dir2/dir22/dir220"), true);
196210
assertDirsEverywhere(path, 8);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,18 +1654,31 @@ synchronized void logEdit(final int length, final byte[] data) {
16541654
endTransaction(start);
16551655
}
16561656

1657+
void recoverUnclosedStreams() throws IOException {
1658+
recoverUnclosedStreams(false);
1659+
}
1660+
16571661
/**
16581662
* Run recovery on all journals to recover any unclosed segments
16591663
*/
1660-
synchronized void recoverUnclosedStreams() {
1664+
synchronized void recoverUnclosedStreams(boolean terminateOnFailure) throws IOException {
16611665
Preconditions.checkState(
16621666
state == State.BETWEEN_LOG_SEGMENTS,
16631667
"May not recover segments - wrong state: %s", state);
16641668
try {
16651669
journalSet.recoverUnfinalizedSegments();
16661670
} catch (IOException ex) {
1667-
// All journals have failed, it is handled in logSync.
1668-
// TODO: are we sure this is OK?
1671+
if (terminateOnFailure) {
1672+
final String msg = "Unable to recover log segments: "
1673+
+ "too few journals successfully recovered.";
1674+
LOG.error(msg, ex);
1675+
synchronized (journalSetLock) {
1676+
IOUtils.cleanupWithLogger(LOG, journalSet);
1677+
}
1678+
terminate(1, msg);
1679+
} else {
1680+
throw ex;
1681+
}
16691682
}
16701683
}
16711684

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1389,7 +1389,7 @@ void startActiveServices() throws IOException {
13891389
// During startup, we're already open for write during initialization.
13901390
editLog.initJournalsForWrite();
13911391
// May need to recover
1392-
editLog.recoverUnclosedStreams();
1392+
editLog.recoverUnclosedStreams(true);
13931393

13941394
LOG.info("Catching up to latest edits from old active before " +
13951395
"taking over writer role in edits logs");

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ public Void run() throws Exception {
311311
startTime - lastLoadTimeMs);
312312
// It is already under the name system lock and the checkpointer
313313
// thread is already stopped. No need to acquire any other lock.
314-
editsTailed = doTailEdits();
314+
// HDFS-16689. Disable inProgress to use the streaming mechanism
315+
editsTailed = doTailEdits(false);
315316
} catch (InterruptedException e) {
316317
throw new IOException(e);
317318
} finally {
@@ -323,9 +324,13 @@ public Void run() throws Exception {
323324
}
324325
});
325326
}
326-
327+
327328
@VisibleForTesting
328329
public long doTailEdits() throws IOException, InterruptedException {
330+
return doTailEdits(inProgressOk);
331+
}
332+
333+
private long doTailEdits(boolean enableInProgress) throws IOException, InterruptedException {
329334
Collection<EditLogInputStream> streams;
330335
FSImage image = namesystem.getFSImage();
331336

@@ -334,7 +339,7 @@ public long doTailEdits() throws IOException, InterruptedException {
334339
long startTime = timer.monotonicNow();
335340
try {
336341
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
337-
null, inProgressOk, true);
342+
null, enableInProgress, true);
338343
} catch (IOException ioe) {
339344
// This is acceptable. If we try to tail edits in the middle of an edits
340345
// log roll, i.e. the last one has been finalized but the new inprogress

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.hadoop.ipc.RemoteException;
3434
import org.apache.hadoop.test.GenericTestUtils;
3535
import org.apache.hadoop.util.ExitUtil;
36-
import org.apache.hadoop.util.ExitUtil.ExitException;
3736
import org.junit.After;
3837
import org.junit.Before;
3938
import org.junit.Test;
@@ -197,10 +196,9 @@ public void testMismatchedNNIsRejected() throws Exception {
197196
.manageNameDfsDirs(false).format(false).checkExitOnShutdown(false)
198197
.build();
199198
fail("New NN with different namespace should have been rejected");
200-
} catch (ExitException ee) {
199+
} catch (IOException ioe) {
201200
GenericTestUtils.assertExceptionContains(
202-
"Unable to start log segment 1: too few journals", ee);
203-
assertTrue("Didn't terminate properly ", ExitUtil.terminateCalled());
201+
"recoverUnfinalizedSegments failed for too many journals", ioe);
204202
}
205203
}
206204
}

0 commit comments

Comments
 (0)