@@ -141,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() {
141
141
142
142
private interface LeaseManager {
143
143
144
- void begin (DFSClient client , long inodeId );
144
+ void begin (DFSClient client , HdfsFileStatus stat );
145
145
146
- void end (DFSClient client , long inodeId );
146
+ void end (DFSClient client , HdfsFileStatus stat );
147
147
}
148
148
149
149
private static final LeaseManager LEASE_MANAGER ;
@@ -202,7 +202,58 @@ public boolean isClientRunning(DFSClient client) {
202
202
};
203
203
}
204
204
205
- private static LeaseManager createLeaseManager () throws NoSuchMethodException {
205
+ private static LeaseManager createLeaseManager3_4 () throws NoSuchMethodException {
206
+ Method beginFileLeaseMethod =
207
+ DFSClient .class .getDeclaredMethod ("beginFileLease" , String .class , DFSOutputStream .class );
208
+ beginFileLeaseMethod .setAccessible (true );
209
+ Method endFileLeaseMethod = DFSClient .class .getDeclaredMethod ("endFileLease" , String .class );
210
+ endFileLeaseMethod .setAccessible (true );
211
+ Method getConfigurationMethod = DFSClient .class .getDeclaredMethod ("getConfiguration" );
212
+ getConfigurationMethod .setAccessible (true );
213
+ Method getNamespaceMehtod = HdfsFileStatus .class .getDeclaredMethod ("getNamespace" );
214
+
215
+ return new LeaseManager () {
216
+
217
+ private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
218
+ "dfs.client.output.stream.uniq.default.key" ;
219
+ private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT" ;
220
+
221
+ private String getUniqId (DFSClient client , HdfsFileStatus stat )
222
+ throws IllegalAccessException , IllegalArgumentException , InvocationTargetException {
223
+ // Copied from DFSClient in Hadoop 3.4.0
224
+ long fileId = stat .getFileId ();
225
+ String namespace = (String ) getNamespaceMehtod .invoke (stat );
226
+ if (namespace == null ) {
227
+ Configuration conf = (Configuration ) getConfigurationMethod .invoke (client );
228
+ String defaultKey = conf .get (DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY ,
229
+ DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT );
230
+ return defaultKey + "_" + fileId ;
231
+ } else {
232
+ return namespace + "_" + fileId ;
233
+ }
234
+ }
235
+
236
+ @ Override
237
+ public void begin (DFSClient client , HdfsFileStatus stat ) {
238
+ try {
239
+ beginFileLeaseMethod .invoke (client , getUniqId (client , stat ), null );
240
+ } catch (IllegalAccessException | InvocationTargetException e ) {
241
+ throw new RuntimeException (e );
242
+ }
243
+ }
244
+
245
+ @ Override
246
+ public void end (DFSClient client , HdfsFileStatus stat ) {
247
+ try {
248
+ endFileLeaseMethod .invoke (client , getUniqId (client , stat ));
249
+ } catch (IllegalAccessException | InvocationTargetException e ) {
250
+ throw new RuntimeException (e );
251
+ }
252
+ }
253
+ };
254
+ }
255
+
256
+ private static LeaseManager createLeaseManager3 () throws NoSuchMethodException {
206
257
Method beginFileLeaseMethod =
207
258
DFSClient .class .getDeclaredMethod ("beginFileLease" , long .class , DFSOutputStream .class );
208
259
beginFileLeaseMethod .setAccessible (true );
@@ -211,25 +262,35 @@ private static LeaseManager createLeaseManager() throws NoSuchMethodException {
211
262
return new LeaseManager () {
212
263
213
264
@ Override
214
- public void begin (DFSClient client , long inodeId ) {
265
+ public void begin (DFSClient client , HdfsFileStatus stat ) {
215
266
try {
216
- beginFileLeaseMethod .invoke (client , inodeId , null );
267
+ beginFileLeaseMethod .invoke (client , stat . getFileId () , null );
217
268
} catch (IllegalAccessException | InvocationTargetException e ) {
218
269
throw new RuntimeException (e );
219
270
}
220
271
}
221
272
222
273
@ Override
223
- public void end (DFSClient client , long inodeId ) {
274
+ public void end (DFSClient client , HdfsFileStatus stat ) {
224
275
try {
225
- endFileLeaseMethod .invoke (client , inodeId );
276
+ endFileLeaseMethod .invoke (client , stat . getFileId () );
226
277
} catch (IllegalAccessException | InvocationTargetException e ) {
227
278
throw new RuntimeException (e );
228
279
}
229
280
}
230
281
};
231
282
}
232
283
284
+ private static LeaseManager createLeaseManager () throws NoSuchMethodException {
285
+ try {
286
+ return createLeaseManager3_4 ();
287
+ } catch (NoSuchMethodException e ) {
288
+ LOG .debug ("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below" );
289
+ }
290
+
291
+ return createLeaseManager3 ();
292
+ }
293
+
233
294
private static FileCreator createFileCreator3_3 () throws NoSuchMethodException {
234
295
Method createMethod = ClientProtocol .class .getMethod ("create" , String .class , FsPermission .class ,
235
296
String .class , EnumSetWritable .class , boolean .class , short .class , long .class ,
@@ -320,12 +381,12 @@ public boolean progress() {
320
381
}
321
382
}
322
383
323
- static void beginFileLease (DFSClient client , long inodeId ) {
324
- LEASE_MANAGER .begin (client , inodeId );
384
+ static void beginFileLease (DFSClient client , HdfsFileStatus stat ) {
385
+ LEASE_MANAGER .begin (client , stat );
325
386
}
326
387
327
- static void endFileLease (DFSClient client , long inodeId ) {
328
- LEASE_MANAGER .end (client , inodeId );
388
+ static void endFileLease (DFSClient client , HdfsFileStatus stat ) {
389
+ LEASE_MANAGER .end (client , stat );
329
390
}
330
391
331
392
static DataChecksum createChecksum (DFSClient client ) {
@@ -552,7 +613,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
552
613
throw new NameNodeException (e );
553
614
}
554
615
}
555
- beginFileLease (client , stat . getFileId () );
616
+ beginFileLease (client , stat );
556
617
boolean succ = false ;
557
618
LocatedBlock locatedBlock = null ;
558
619
List <Future <Channel >> futureList = null ;
@@ -576,8 +637,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
576
637
}
577
638
Encryptor encryptor = createEncryptor (conf , stat , client );
578
639
FanOutOneBlockAsyncDFSOutput output =
579
- new FanOutOneBlockAsyncDFSOutput (conf , dfs , client , namenode , clientName , src ,
580
- stat . getFileId (), locatedBlock , encryptor , datanodes , summer , ALLOC , monitor );
640
+ new FanOutOneBlockAsyncDFSOutput (conf , dfs , client , namenode , clientName , src , stat ,
641
+ locatedBlock , encryptor , datanodes , summer , ALLOC , monitor );
581
642
succ = true ;
582
643
return output ;
583
644
} catch (RemoteException e ) {
@@ -616,7 +677,7 @@ public void operationComplete(Future<Channel> future) throws Exception {
616
677
});
617
678
}
618
679
}
619
- endFileLease (client , stat . getFileId () );
680
+ endFileLease (client , stat );
620
681
}
621
682
}
622
683
}
@@ -654,11 +715,11 @@ public static boolean shouldRetryCreate(RemoteException e) {
654
715
}
655
716
656
717
static void completeFile (DFSClient client , ClientProtocol namenode , String src , String clientName ,
657
- ExtendedBlock block , long fileId ) {
718
+ ExtendedBlock block , HdfsFileStatus stat ) {
658
719
for (int retry = 0 ;; retry ++) {
659
720
try {
660
- if (namenode .complete (src , clientName , block , fileId )) {
661
- endFileLease (client , fileId );
721
+ if (namenode .complete (src , clientName , block , stat . getFileId () )) {
722
+ endFileLease (client , stat );
662
723
return ;
663
724
} else {
664
725
LOG .warn ("complete file " + src + " not finished, retry = " + retry );
0 commit comments