Skip to content

Commit

Permalink
use getRPCClient() instead of rpcClient in async methods,.
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Oct 14, 2024
1 parent 5123f66 commit cb4d99a
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
asyncComplete(null);
if (!nsId.isEmpty()) {
asyncTry(() -> {
rpcClient.invokeSingle(nsId, method, clazz);
getRPCClient().invokeSingle(nsId, method, clazz);
});

asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> {
Expand Down Expand Up @@ -898,7 +898,7 @@ <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
String nsId = fnInfo.getNameserviceId();
LOG.debug("Invoking {} on namespace {}", method, nsId);
asyncTry(() -> {
rpcClient.invokeSingle(nsId, method, clazz);
getRPCClient().invokeSingle(nsId, method, clazz);
asyncApply(result -> {
if (result != null && isExpectedClass(clazz, result)) {
foreach.breakNow();
Expand Down Expand Up @@ -1092,7 +1092,7 @@ private RemoteLocation getExistingLocationAsync(String src,
List<RemoteLocation> locations) throws IOException {
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());
rpcClient.invokeConcurrent(
getRPCClient().invokeConcurrent(
locations, method, true, false, HdfsFileStatus.class);
asyncApply((ApplyFunction<Map<RemoteLocation, HdfsFileStatus>, Object>) results -> {
for (RemoteLocation loc : locations) {
Expand Down Expand Up @@ -1378,7 +1378,7 @@ public DatanodeInfo[] getDatanodeReportAsync(
new Class<?>[] {DatanodeReportType.class}, type);

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
getRPCClient().invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
Expand Down Expand Up @@ -1460,7 +1460,7 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync(
RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
new Class<?>[] {DatanodeReportType.class}, type);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(
getRPCClient().invokeConcurrent(
nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeStorageReport[]>,
Expand Down Expand Up @@ -2309,7 +2309,7 @@ public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long t
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
getRPCClient().invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
Expand Down

0 comments on commit cb4d99a

Please sign in to comment.