@@ -231,6 +231,22 @@ private static FileCreator createFileCreator() throws NoSuchMethodException {
231
231
return createFileCreator3 ();
232
232
}
233
233
234
+ // hadoop 3.3.1 changed the return value of this method from DatanodeInfo[] to
235
+ // DatanodeInfoWithStorage[], which causes the JVM can not locate the method if we are compiled
236
+ // with hadoop 3.2 and then link with hadoop 3.3, so here we need to use reflection to make it
237
+ // work for both hadoop versions, otherwise we need to publish more artifacts for different hadoop
238
+ // versions...
239
+ private static final Method GET_LOCATED_BLOCK_LOCATIONS_METHOD ;
240
+
241
+ private static DatanodeInfo [] getLocatedBlockLocations (LocatedBlock block ) {
242
+ try {
243
+ // DatanodeInfoWithStorage[] can be casted to DatanodeInfo[] directly
244
+ return (DatanodeInfo []) GET_LOCATED_BLOCK_LOCATIONS_METHOD .invoke (block );
245
+ } catch (IllegalAccessException | InvocationTargetException e ) {
246
+ throw new RuntimeException (e );
247
+ }
248
+ }
249
+
234
250
// cancel the processing if DFSClient is already closed.
235
251
static final class CancelOnClose implements CancelableProgressable {
236
252
@@ -250,6 +266,7 @@ public boolean progress() {
250
266
try {
251
267
LEASE_MANAGER = createLeaseManager ();
252
268
FILE_CREATOR = createFileCreator ();
269
+ GET_LOCATED_BLOCK_LOCATIONS_METHOD = LocatedBlock .class .getMethod ("getLocations" );
253
270
} catch (Exception e ) {
254
271
String msg = "Couldn't properly initialize access to HDFS internals. Please "
255
272
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
@@ -383,7 +400,7 @@ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSC
383
400
BlockConstructionStage stage , DataChecksum summer , EventLoopGroup eventLoopGroup ,
384
401
Class <? extends Channel > channelClass ) {
385
402
StorageType [] storageTypes = locatedBlock .getStorageTypes ();
386
- DatanodeInfo [] datanodeInfos = locatedBlock . getLocations ( );
403
+ DatanodeInfo [] datanodeInfos = getLocatedBlockLocations ( locatedBlock );
387
404
boolean connectToDnViaHostname =
388
405
conf .getBoolean (DFS_CLIENT_USE_DN_HOSTNAME , DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT );
389
406
int timeoutMs = conf .getInt (DFS_CLIENT_SOCKET_TIMEOUT_KEY , READ_TIMEOUT );
0 commit comments