Skip to content

Commit 3083d44

Browse files
LeoLeeeeeeKeeProMise
authored andcommitted
HDFS-17597. [ARR] RouterSnapshot supports asynchronous rpc. (apache#6994). Contributed by Wenqi Li.
Reviewed-by: Jian Zhang <keepromise@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
1 parent abb9346 commit 3083d44

File tree

2 files changed

+410
-0
lines changed

2 files changed

+410
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import org.apache.hadoop.hdfs.DFSUtil;
21+
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
22+
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
23+
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
24+
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
25+
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
26+
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
27+
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
28+
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
29+
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
30+
import org.apache.hadoop.hdfs.server.namenode.NameNode;
31+
32+
import java.io.IOException;
33+
import java.util.Collection;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Set;
37+
38+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
39+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
40+
41+
/**
42+
* Module that implements all the asynchronous RPC calls related to snapshots in
43+
* {@link ClientProtocol} in the {@link RouterRpcServer}.
44+
*/
45+
public class RouterAsyncSnapshot extends RouterSnapshot {
46+
/** RPC server to receive client calls. */
47+
private final RouterRpcServer rpcServer;
48+
/** RPC clients to connect to the Namenodes. */
49+
private final RouterRpcClient rpcClient;
50+
/** Find generic locations. */
51+
private final ActiveNamenodeResolver namenodeResolver;
52+
53+
public RouterAsyncSnapshot(RouterRpcServer server) {
54+
super(server);
55+
this.rpcServer = server;
56+
this.rpcClient = this.rpcServer.getRPCClient();
57+
this.namenodeResolver = rpcServer.getNamenodeResolver();
58+
}
59+
60+
@Override
61+
public String createSnapshot(String snapshotRoot, String snapshotName) throws IOException {
62+
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
63+
64+
final List<RemoteLocation> locations =
65+
rpcServer.getLocationsForPath(snapshotRoot, true, false);
66+
RemoteMethod method = new RemoteMethod("createSnapshot",
67+
new Class<?>[] {String.class, String.class}, new RemoteParam(),
68+
snapshotName);
69+
70+
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
71+
rpcClient.invokeConcurrent(locations, method, String.class);
72+
asyncApply((ApplyFunction<Map<RemoteLocation, String>, String>)
73+
results -> {
74+
Map.Entry<RemoteLocation, String> firstelement =
75+
results.entrySet().iterator().next();
76+
RemoteLocation loc = firstelement.getKey();
77+
String result = firstelement.getValue();
78+
return result.replaceFirst(loc.getDest(), loc.getSrc());
79+
});
80+
} else {
81+
rpcClient.invokeSequential(method, locations, String.class, null);
82+
asyncApply((ApplyFunction<RemoteResult<RemoteLocation, String>, String>)
83+
response -> {
84+
RemoteLocation loc = response.getLocation();
85+
String invokedResult = response.getResult();
86+
return invokedResult.replaceFirst(loc.getDest(), loc.getSrc());
87+
});
88+
}
89+
return asyncReturn(String.class);
90+
}
91+
92+
@Override
93+
public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException {
94+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
95+
96+
RemoteMethod method = new RemoteMethod("getSnapshottableDirListing");
97+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
98+
rpcClient.invokeConcurrent(
99+
nss, method, true, false, SnapshottableDirectoryStatus[].class);
100+
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, SnapshottableDirectoryStatus[]>,
101+
SnapshottableDirectoryStatus[]>)
102+
ret -> RouterRpcServer.merge(ret, SnapshottableDirectoryStatus.class));
103+
return asyncReturn(SnapshottableDirectoryStatus[].class);
104+
}
105+
106+
@Override
107+
public SnapshotStatus[] getSnapshotListing(String snapshotRoot) throws IOException {
108+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
109+
final List<RemoteLocation> locations =
110+
rpcServer.getLocationsForPath(snapshotRoot, true, false);
111+
RemoteMethod remoteMethod = new RemoteMethod("getSnapshotListing",
112+
new Class<?>[]{String.class},
113+
new RemoteParam());
114+
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
115+
rpcClient.invokeConcurrent(
116+
locations, remoteMethod, true, false, SnapshotStatus[].class);
117+
asyncApply((ApplyFunction<Map<RemoteLocation, SnapshotStatus[]>, SnapshotStatus[]>)
118+
ret -> {
119+
SnapshotStatus[] response = ret.values().iterator().next();
120+
String src = ret.keySet().iterator().next().getSrc();
121+
String dst = ret.keySet().iterator().next().getDest();
122+
for (SnapshotStatus s : response) {
123+
String mountPath = DFSUtil.bytes2String(s.getParentFullPath()).
124+
replaceFirst(src, dst);
125+
s.setParentFullPath(DFSUtil.string2Bytes(mountPath));
126+
}
127+
return response;
128+
});
129+
} else {
130+
rpcClient
131+
.invokeSequential(remoteMethod, locations, SnapshotStatus[].class,
132+
null);
133+
asyncApply((ApplyFunction<RemoteResult<RemoteLocation, SnapshotStatus[]>, SnapshotStatus[]>)
134+
invokedResponse -> {
135+
RemoteLocation loc = invokedResponse.getLocation();
136+
SnapshotStatus[] response = invokedResponse.getResult();
137+
for (SnapshotStatus s : response) {
138+
String mountPath = DFSUtil.bytes2String(s.getParentFullPath()).
139+
replaceFirst(loc.getDest(), loc.getSrc());
140+
s.setParentFullPath(DFSUtil.string2Bytes(mountPath));
141+
}
142+
return response;
143+
});
144+
}
145+
return asyncReturn(SnapshotStatus[].class);
146+
}
147+
148+
@Override
149+
public SnapshotDiffReport getSnapshotDiffReport(
150+
String snapshotRoot, String earlierSnapshotName,
151+
String laterSnapshotName) throws IOException {
152+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
153+
154+
final List<RemoteLocation> locations =
155+
rpcServer.getLocationsForPath(snapshotRoot, true, false);
156+
RemoteMethod remoteMethod = new RemoteMethod("getSnapshotDiffReport",
157+
new Class<?>[] {String.class, String.class, String.class},
158+
new RemoteParam(), earlierSnapshotName, laterSnapshotName);
159+
160+
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
161+
rpcClient.invokeConcurrent(
162+
locations, remoteMethod, true, false, SnapshotDiffReport.class);
163+
asyncApply((ApplyFunction<Map<RemoteLocation, SnapshotDiffReport>, SnapshotDiffReport>)
164+
ret -> ret.values().iterator().next());
165+
return asyncReturn(SnapshotDiffReport.class);
166+
} else {
167+
return rpcClient.invokeSequential(
168+
locations, remoteMethod, SnapshotDiffReport.class, null);
169+
}
170+
}
171+
172+
@Override
173+
public SnapshotDiffReportListing getSnapshotDiffReportListing(
174+
String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
175+
byte[] startPath, int index) throws IOException {
176+
rpcServer.checkOperation(NameNode.OperationCategory.READ);
177+
178+
final List<RemoteLocation> locations =
179+
rpcServer.getLocationsForPath(snapshotRoot, true, false);
180+
Class<?>[] params = new Class<?>[] {
181+
String.class, String.class, String.class,
182+
byte[].class, int.class};
183+
RemoteMethod remoteMethod = new RemoteMethod(
184+
"getSnapshotDiffReportListing", params,
185+
new RemoteParam(), earlierSnapshotName, laterSnapshotName,
186+
startPath, index);
187+
188+
if (rpcServer.isInvokeConcurrent(snapshotRoot)) {
189+
rpcClient.invokeConcurrent(locations, remoteMethod, false, false,
190+
SnapshotDiffReportListing.class);
191+
asyncApply((ApplyFunction<Map<RemoteLocation, SnapshotDiffReportListing>,
192+
SnapshotDiffReportListing>) ret -> {
193+
Collection<SnapshotDiffReportListing> listings = ret.values();
194+
SnapshotDiffReportListing listing0 = listings.iterator().next();
195+
return listing0;
196+
});
197+
return asyncReturn(SnapshotDiffReportListing.class);
198+
} else {
199+
return rpcClient.invokeSequential(
200+
locations, remoteMethod, SnapshotDiffReportListing.class, null);
201+
}
202+
}
203+
}

0 commit comments

Comments
 (0)