Skip to content

Commit 33e630b

Browse files
authored
Merge branch 'apache:trunk' into HADOOP-18359
2 parents 87e9cf3 + 845cf8b commit 33e630b

File tree

6 files changed

+127
-64
lines changed

6 files changed

+127
-64
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,9 +349,6 @@ public static ClientProtocol createProxyWithAlignmentContext(
349349
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
350350
AlignmentContext alignmentContext)
351351
throws IOException {
352-
if (alignmentContext == null) {
353-
alignmentContext = new ClientGSIContext();
354-
}
355352
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
356353
ProtobufRpcEngine2.class);
357354

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.hadoop.hdfs.server.namenode.FSImage;
9292
import org.apache.hadoop.hdfs.server.namenode.NameNode;
9393
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
94+
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
9495
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
9596
import org.apache.hadoop.http.HttpConfig;
9697
import org.apache.hadoop.net.NetUtils;
@@ -233,6 +234,20 @@ public FileSystem getFileSystem() throws IOException {
233234
return DistributedFileSystem.get(conf);
234235
}
235236

237+
public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException {
238+
Configuration observerReadConf = new Configuration(conf);
239+
observerReadConf.set(DFS_NAMESERVICES,
240+
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");
241+
observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
242+
observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
243+
getFileSystemURI().toString());
244+
observerReadConf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
245+
+ "." + "router-service", ObserverReadProxyProvider.class.getName());
246+
DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service");
247+
248+
return DistributedFileSystem.get(observerReadConf);
249+
}
250+
236251
public DFSClient getClient(UserGroupInformation user)
237252
throws IOException, URISyntaxException, InterruptedException {
238253

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java

Lines changed: 62 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertNotEquals;
2323
import static org.junit.Assert.assertTrue;
24+
import static org.junit.Assert.assertThrows;
2425
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
2526
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
2627
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
@@ -41,15 +42,40 @@
4142
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
4243
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
4344
import org.apache.hadoop.hdfs.server.namenode.NameNode;
44-
import org.junit.After;
45-
import org.junit.Test;
45+
import org.junit.jupiter.api.Test;
46+
import org.junit.jupiter.api.AfterEach;
47+
import org.junit.jupiter.api.BeforeEach;
48+
import org.junit.jupiter.api.Tag;
49+
import org.junit.jupiter.api.TestInfo;
4650

47-
public class TestObserverWithRouter {
4851

52+
public class TestObserverWithRouter {
53+
private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
4954
private MiniRouterDFSCluster cluster;
55+
private RouterContext routerContext;
56+
private FileSystem fileSystem;
5057

51-
public void startUpCluster(int numberOfObserver) throws Exception {
52-
startUpCluster(numberOfObserver, null);
58+
@BeforeEach
59+
void init(TestInfo info) throws Exception {
60+
if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) {
61+
return;
62+
}
63+
startUpCluster(2, null);
64+
}
65+
66+
@AfterEach
67+
public void teardown() throws IOException {
68+
if (cluster != null) {
69+
cluster.shutdown();
70+
cluster = null;
71+
}
72+
73+
routerContext = null;
74+
75+
if (fileSystem != null) {
76+
fileSystem.close();
77+
fileSystem = null;
78+
}
5379
}
5480

5581
public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
@@ -95,31 +121,39 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
95121
cluster.installMockLocations();
96122

97123
cluster.waitActiveNamespaces();
124+
routerContext = cluster.getRandomRouter();
125+
fileSystem = routerContext.getFileSystemWithObserverReadsEnabled();
98126
}
99127

100-
@After
101-
public void teardown() throws IOException {
102-
if (cluster != null) {
103-
cluster.shutdown();
104-
cluster = null;
105-
}
128+
@Test
129+
public void testObserverRead() throws Exception {
130+
internalTestObserverRead();
106131
}
107132

133+
/**
134+
* Tests that without adding config to use ObserverProxyProvider, the client shouldn't
135+
* have reads served by Observers.
136+
* Fixes regression in HDFS-13522.
137+
*/
108138
@Test
109-
public void testObserverRead() throws Exception {
110-
startUpCluster(1);
111-
RouterContext routerContext = cluster.getRandomRouter();
139+
public void testReadWithoutObserverClientConfigurations() throws Exception {
140+
fileSystem.close();
141+
fileSystem = routerContext.getFileSystem();
142+
assertThrows(AssertionError.class, this::internalTestObserverRead);
143+
}
144+
145+
public void internalTestObserverRead()
146+
throws Exception {
112147
List<? extends FederationNamenodeContext> namenodes = routerContext
113148
.getRouter().getNamenodeResolver()
114149
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
115150
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
116151
FederationNamenodeServiceState.OBSERVER);
117-
FileSystem fileSystem = routerContext.getFileSystem();
118152
Path path = new Path("/testFile");
119-
// Send Create call to active
153+
// Send create call
120154
fileSystem.create(path).close();
121155

122-
// Send read request to observer
156+
// Send read request
123157
fileSystem.open(path).close();
124158

125159
long rpcCountForActive = routerContext.getRouter().getRpcServer()
@@ -131,21 +165,19 @@ public void testObserverRead() throws Exception {
131165
.getRPCMetrics().getObserverProxyOps();
132166
// getBlockLocations should be sent to observer
133167
assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
134-
fileSystem.close();
135168
}
136169

137170
@Test
171+
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
138172
public void testObserverReadWithoutFederatedStatePropagation() throws Exception {
139173
Configuration confOverrides = new Configuration(false);
140174
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
141-
startUpCluster(1, confOverrides);
142-
RouterContext routerContext = cluster.getRandomRouter();
175+
startUpCluster(2, confOverrides);
143176
List<? extends FederationNamenodeContext> namenodes = routerContext
144177
.getRouter().getNamenodeResolver()
145178
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
146179
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
147180
FederationNamenodeServiceState.OBSERVER);
148-
FileSystem fileSystem = routerContext.getFileSystem();
149181
Path path = new Path("/testFile");
150182
// Send Create call to active
151183
fileSystem.create(path).close();
@@ -161,22 +193,19 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception
161193
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
162194
.getRPCMetrics().getObserverProxyOps();
163195
assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
164-
fileSystem.close();
165196
}
166197

167198
@Test
199+
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
168200
public void testDisablingObserverReadUsingNameserviceOverride() throws Exception {
169201
// Disable observer reads using per-nameservice override
170202
Configuration confOverrides = new Configuration(false);
171203
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
172-
startUpCluster(1, confOverrides);
204+
startUpCluster(2, confOverrides);
173205

174-
RouterContext routerContext = cluster.getRandomRouter();
175-
FileSystem fileSystem = routerContext.getFileSystem();
176206
Path path = new Path("/testFile");
177207
fileSystem.create(path).close();
178208
fileSystem.open(path).close();
179-
fileSystem.close();
180209

181210
long rpcCountForActive = routerContext.getRouter().getRpcServer()
182211
.getRPCMetrics().getActiveProxyOps();
@@ -190,17 +219,15 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception
190219

191220
@Test
192221
public void testReadWhenObserverIsDown() throws Exception {
193-
startUpCluster(1);
194-
RouterContext routerContext = cluster.getRandomRouter();
195-
FileSystem fileSystem = routerContext.getFileSystem();
196222
Path path = new Path("/testFile1");
197223
// Send Create call to active
198224
fileSystem.create(path).close();
199225

200226
// Stop observer NN
201227
int nnIndex = stopObserver(1);
202-
203228
assertNotEquals("No observer found", 3, nnIndex);
229+
nnIndex = stopObserver(1);
230+
assertNotEquals("No observer found", 4, nnIndex);
204231

205232
// Send read request
206233
fileSystem.open(path).close();
@@ -215,14 +242,10 @@ public void testReadWhenObserverIsDown() throws Exception {
215242
.getRPCMetrics().getObserverProxyOps();
216243
assertEquals("No call should send to observer", 0,
217244
rpcCountForObserver);
218-
fileSystem.close();
219245
}
220246

221247
@Test
222248
public void testMultipleObserver() throws Exception {
223-
startUpCluster(2);
224-
RouterContext routerContext = cluster.getRandomRouter();
225-
FileSystem fileSystem = routerContext.getFileSystem();
226249
Path path = new Path("/testFile1");
227250
// Send Create call to active
228251
fileSystem.create(path).close();
@@ -267,7 +290,6 @@ public void testMultipleObserver() throws Exception {
267290
.getRpcServer().getRPCMetrics().getObserverProxyOps();
268291
assertEquals("No call should send to observer",
269292
expectedObserverRpc, rpcCountForObserver);
270-
fileSystem.close();
271293
}
272294

273295
private int stopObserver(int num) {
@@ -288,9 +310,9 @@ private int stopObserver(int num) {
288310
// test router observer with multiple to know which observer NN received
289311
// requests
290312
@Test
313+
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
291314
public void testMultipleObserverRouter() throws Exception {
292315
StateStoreDFSCluster innerCluster;
293-
RouterContext routerContext;
294316
MembershipNamenodeResolver resolver;
295317

296318
String ns0;
@@ -356,14 +378,12 @@ public void testMultipleObserverRouter() throws Exception {
356378
namespaceInfo0.get(1).getNamenodeId());
357379
assertEquals(namespaceInfo1.get(0).getState(),
358380
FederationNamenodeServiceState.OBSERVER);
381+
382+
innerCluster.shutdown();
359383
}
360384

361385
@Test
362386
public void testUnavailableObserverNN() throws Exception {
363-
startUpCluster(2);
364-
RouterContext routerContext = cluster.getRandomRouter();
365-
FileSystem fileSystem = routerContext.getFileSystem();
366-
367387
stopObserver(2);
368388

369389
Path path = new Path("/testFile");
@@ -397,12 +417,10 @@ public void testUnavailableObserverNN() throws Exception {
397417
assertTrue("There must be unavailable namenodes", hasUnavailable);
398418
}
399419

420+
421+
400422
@Test
401423
public void testRouterMsync() throws Exception {
402-
startUpCluster(1);
403-
RouterContext routerContext = cluster.getRandomRouter();
404-
405-
FileSystem fileSystem = routerContext.getFileSystem();
406424
Path path = new Path("/testFile");
407425

408426
// Send Create call to active
@@ -420,6 +438,5 @@ public void testRouterMsync() throws Exception {
420438
// 2 msync calls should be sent. One to each active namenode in the two namespaces.
421439
assertEquals("Four calls should be sent to active", 4,
422440
rpcCountForActive);
423-
fileSystem.close();
424441
}
425442
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/static/federation/federation.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ $(document).ready(function() {
2828
var capabilityArr = scTableData.filter(item => (item.subcluster === row.id()));
2929
var capabilityObj = JSON.parse(capabilityArr[0].capability).clusterMetrics;
3030
row.child(
31-
'<table>' +
31+
'<table style="line-height:25px;" >' +
3232
' <tr>' +
3333
' <td>' +
3434
' <h3>Application Metrics</h3>' +
@@ -42,11 +42,12 @@ $(document).ready(function() {
4242
' <td>' +
4343
' <h3>Resource Metrics</h3>' +
4444
' <h4>Memory</h4>' +
45-
' TotalMB : ' + capabilityObj.totalMB + ' </p>' +
46-
' ReservedMB : ' + capabilityObj.reservedMB + ' </p>' +
47-
' AvailableMB : ' + capabilityObj.availableMB + ' </p>' +
48-
' AllocatedMB : ' + capabilityObj.allocatedMB + ' </p>' +
49-
' PendingMB : ' + capabilityObj.pendingMB + ' </p>' +
45+
' Total Memory : ' + capabilityArr[0].totalmemory + ' </p>' +
46+
' Reserved Memory : ' + capabilityArr[0].reservedmemory + ' </p>' +
47+
' Available Memory : ' + capabilityArr[0].availablememory + ' </p>' +
48+
' Allocated Memory : ' + capabilityArr[0].allocatedmemory + ' </p>' +
49+
' Pending Memory : ' + capabilityArr[0].pendingmemory + ' </p>' +
50+
' <hr />' +
5051
' <h4>VirtualCores</h4>' +
5152
' TotalVirtualCores : ' + capabilityObj.totalVirtualCores + ' </p>' +
5253
' ReservedVirtualCores : ' + capabilityObj.reservedVirtualCores + ' </p>' +

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationBlock.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.HashMap;
26+
import java.util.Date;
2627

2728
import com.google.gson.Gson;
2829
import org.apache.hadoop.util.StringUtils;
29-
import org.apache.commons.lang3.time.DateFormatUtils;
3030
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
3131
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
32+
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
3233
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
3334
import org.apache.hadoop.yarn.server.router.Router;
3435
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
@@ -149,32 +150,51 @@ private void initHtmlPageFederation(Block html, boolean isEnabled) {
149150
ClusterMetricsInfo subClusterInfo = getClusterMetricsInfo(capability);
150151

151152
// Prepare LastStartTime & LastHeartBeat
152-
String lastStartTime =
153-
DateFormatUtils.format(subcluster.getLastStartTime(), DATE_PATTERN);
154-
String lastHeartBeat =
155-
DateFormatUtils.format(subcluster.getLastHeartBeat(), DATE_PATTERN);
153+
Date lastStartTime = new Date(subcluster.getLastStartTime());
154+
Date lastHeartBeat = new Date(subcluster.getLastHeartBeat());
156155

157156
// Prepare Resource
158157
long totalMB = subClusterInfo.getTotalMB();
159158
String totalMBDesc = StringUtils.byteDesc(totalMB * BYTES_IN_MB);
160159
long totalVirtualCores = subClusterInfo.getTotalVirtualCores();
161-
String resources = String.format("<Memory:%s, VCore:%s>", totalMBDesc, totalVirtualCores);
160+
String resources = String.format("<memory:%s, vCores:%s>", totalMBDesc, totalVirtualCores);
162161

163162
// Prepare Node
164163
long totalNodes = subClusterInfo.getTotalNodes();
165164
long activeNodes = subClusterInfo.getActiveNodes();
166-
String nodes = String.format("<Total Nodes:%s, Active Nodes:%s>", totalNodes, activeNodes);
165+
String nodes = String.format("<totalNodes:%s, activeNodes:%s>", totalNodes, activeNodes);
167166

168167
// Prepare HTML Table
168+
String stateStyle = "color:#dc3545;font-weight:bolder";
169+
SubClusterState state = subcluster.getState();
170+
if (SubClusterState.SC_RUNNING == state) {
171+
stateStyle = "color:#28a745;font-weight:bolder";
172+
}
173+
169174
tbody.tr().$id(subClusterIdText)
170175
.td().$class("details-control").a(herfWebAppAddress, subClusterIdText).__()
171-
.td(subcluster.getState().name())
172-
.td(lastStartTime)
173-
.td(lastHeartBeat)
176+
.td().$style(stateStyle).__(state.name()).__()
177+
.td().__(lastStartTime).__()
178+
.td().__(lastHeartBeat).__()
174179
.td(resources)
175180
.td(nodes)
176181
.__();
177182

183+
// Formatted memory information
184+
long allocatedMB = subClusterInfo.getAllocatedMB();
185+
String allocatedMBDesc = StringUtils.byteDesc(allocatedMB * BYTES_IN_MB);
186+
long availableMB = subClusterInfo.getAvailableMB();
187+
String availableMBDesc = StringUtils.byteDesc(availableMB * BYTES_IN_MB);
188+
long pendingMB = subClusterInfo.getPendingMB();
189+
String pendingMBDesc = StringUtils.byteDesc(pendingMB * BYTES_IN_MB);
190+
long reservedMB = subClusterInfo.getReservedMB();
191+
String reservedMBDesc = StringUtils.byteDesc(reservedMB * BYTES_IN_MB);
192+
193+
subclusterMap.put("totalmemory", totalMBDesc);
194+
subclusterMap.put("allocatedmemory", allocatedMBDesc);
195+
subclusterMap.put("availablememory", availableMBDesc);
196+
subclusterMap.put("pendingmemory", pendingMBDesc);
197+
subclusterMap.put("reservedmemory", reservedMBDesc);
178198
subclusterMap.put("subcluster", subClusterId.getId());
179199
subclusterMap.put("capability", capability);
180200
lists.add(subclusterMap);

0 commit comments

Comments
 (0)