32
32
33
33
import java .io .IOException ;
34
34
import java .io .InterruptedIOException ;
35
+ import java .lang .reflect .Constructor ;
35
36
import java .lang .reflect .InvocationTargetException ;
36
37
import java .lang .reflect .Method ;
37
38
import java .util .ArrayList ;
@@ -140,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() {
140
141
141
142
private interface LeaseManager {
142
143
143
- void begin (DFSClient client , HdfsFileStatus stat );
144
+ void begin (FanOutOneBlockAsyncDFSOutput output );
144
145
145
- void end (DFSClient client , HdfsFileStatus stat );
146
+ void end (FanOutOneBlockAsyncDFSOutput output );
146
147
}
147
148
148
149
private static final LeaseManager LEASE_MANAGER ;
@@ -178,6 +179,16 @@ Object createObject(ClientProtocol instance, String src, FsPermission masked, St
178
179
CryptoProtocolVersion [] supportedVersions ) throws Exception ;
179
180
}
180
181
182
+ // helper class for creating the dummy DFSOutputStream
183
+ private interface DummyDFSOutputStreamCreator {
184
+
185
+ DFSOutputStream createDummyDFSOutputStream (AsyncFSOutput output , DFSClient dfsClient ,
186
+ String src , HdfsFileStatus stat , EnumSet <CreateFlag > flag , DataChecksum checksum );
187
+ }
188
+
189
+ private static final DummyDFSOutputStreamCreator DUMMY_DFS_OUTPUT_STREAM_CREATOR =
190
+ createDummyDFSOutputStreamCreator ();
191
+
181
192
private static final FileCreator FILE_CREATOR ;
182
193
183
194
// CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
@@ -207,44 +218,28 @@ private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException
207
218
beginFileLeaseMethod .setAccessible (true );
208
219
Method endFileLeaseMethod = DFSClient .class .getDeclaredMethod ("endFileLease" , String .class );
209
220
endFileLeaseMethod .setAccessible (true );
210
- Method getConfigurationMethod = DFSClient .class .getDeclaredMethod ("getConfiguration" );
211
- getConfigurationMethod .setAccessible (true );
212
- Method getNamespaceMehtod = HdfsFileStatus .class .getDeclaredMethod ("getNamespace" );
213
-
221
+ Method getUniqKeyMethod = DFSOutputStream .class .getMethod ("getUniqKey" );
214
222
return new LeaseManager () {
215
223
216
- private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
217
- "dfs.client.output.stream.uniq.default.key" ;
218
- private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT" ;
219
-
220
- private String getUniqId (DFSClient client , HdfsFileStatus stat )
221
- throws IllegalAccessException , IllegalArgumentException , InvocationTargetException {
222
- // Copied from DFSClient in Hadoop 3.4.0
223
- long fileId = stat .getFileId ();
224
- String namespace = (String ) getNamespaceMehtod .invoke (stat );
225
- if (namespace == null ) {
226
- Configuration conf = (Configuration ) getConfigurationMethod .invoke (client );
227
- String defaultKey = conf .get (DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY ,
228
- DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT );
229
- return defaultKey + "_" + fileId ;
230
- } else {
231
- return namespace + "_" + fileId ;
232
- }
224
+ private String getUniqKey (FanOutOneBlockAsyncDFSOutput output )
225
+ throws IllegalAccessException , InvocationTargetException {
226
+ return (String ) getUniqKeyMethod .invoke (output .getDummyStream ());
233
227
}
234
228
235
229
@ Override
236
- public void begin (DFSClient client , HdfsFileStatus stat ) {
230
+ public void begin (FanOutOneBlockAsyncDFSOutput output ) {
237
231
try {
238
- beginFileLeaseMethod .invoke (client , getUniqId (client , stat ), null );
232
+ beginFileLeaseMethod .invoke (output .getClient (), getUniqKey (output ),
233
+ output .getDummyStream ());
239
234
} catch (IllegalAccessException | InvocationTargetException e ) {
240
235
throw new RuntimeException (e );
241
236
}
242
237
}
243
238
244
239
@ Override
245
- public void end (DFSClient client , HdfsFileStatus stat ) {
240
+ public void end (FanOutOneBlockAsyncDFSOutput output ) {
246
241
try {
247
- endFileLeaseMethod .invoke (client , getUniqId ( client , stat ));
242
+ endFileLeaseMethod .invoke (output . getClient (), getUniqKey ( output ));
248
243
} catch (IllegalAccessException | InvocationTargetException e ) {
249
244
throw new RuntimeException (e );
250
245
}
@@ -261,18 +256,19 @@ private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
261
256
return new LeaseManager () {
262
257
263
258
@ Override
264
- public void begin (DFSClient client , HdfsFileStatus stat ) {
259
+ public void begin (FanOutOneBlockAsyncDFSOutput output ) {
265
260
try {
266
- beginFileLeaseMethod .invoke (client , stat .getFileId (), null );
261
+ beginFileLeaseMethod .invoke (output .getClient (), output .getStat ().getFileId (),
262
+ output .getDummyStream ());
267
263
} catch (IllegalAccessException | InvocationTargetException e ) {
268
264
throw new RuntimeException (e );
269
265
}
270
266
}
271
267
272
268
@ Override
273
- public void end (DFSClient client , HdfsFileStatus stat ) {
269
+ public void end (FanOutOneBlockAsyncDFSOutput output ) {
274
270
try {
275
- endFileLeaseMethod .invoke (client , stat .getFileId ());
271
+ endFileLeaseMethod .invoke (output . getClient (), output . getStat () .getFileId ());
276
272
} catch (IllegalAccessException | InvocationTargetException e ) {
277
273
throw new RuntimeException (e );
278
274
}
@@ -341,6 +337,28 @@ private static FileCreator createFileCreator() throws NoSuchMethodException {
341
337
return createFileCreator2 ();
342
338
}
343
339
340
+ private static final String DUMMY_DFS_OUTPUT_STREAM_CLASS =
341
+ "org.apache.hadoop.hdfs.DummyDFSOutputStream" ;
342
+
343
+ @ SuppressWarnings ("unchecked" )
344
+ private static DummyDFSOutputStreamCreator createDummyDFSOutputStreamCreator () {
345
+ Constructor <? extends DFSOutputStream > constructor ;
346
+ try {
347
+ constructor = (Constructor <? extends DFSOutputStream >) Class
348
+ .forName (DUMMY_DFS_OUTPUT_STREAM_CLASS ).getConstructors ()[0 ];
349
+ return (output , dfsClient , src , stat , flag , checksum ) -> {
350
+ try {
351
+ return constructor .newInstance (output , dfsClient , src , stat , flag , checksum );
352
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException e ) {
353
+ throw new RuntimeException (e );
354
+ }
355
+ };
356
+ } catch (Exception e ) {
357
+ LOG .debug ("can not find DummyDFSOutputStream, should be hadoop 2.x" , e );
358
+ return (output , dfsClient , src , stat , flag , checksum ) -> null ;
359
+ }
360
+ }
361
+
344
362
private static CreateFlag loadShouldReplicateFlag () {
345
363
try {
346
364
return CreateFlag .valueOf ("SHOULD_REPLICATE" );
@@ -380,12 +398,12 @@ public boolean progress() {
380
398
}
381
399
}
382
400
383
- static void beginFileLease (DFSClient client , HdfsFileStatus stat ) {
384
- LEASE_MANAGER .begin (client , stat );
401
+ private static void beginFileLease (FanOutOneBlockAsyncDFSOutput output ) {
402
+ LEASE_MANAGER .begin (output );
385
403
}
386
404
387
- static void endFileLease (DFSClient client , HdfsFileStatus stat ) {
388
- LEASE_MANAGER .end (client , stat );
405
+ static void endFileLease (FanOutOneBlockAsyncDFSOutput output ) {
406
+ LEASE_MANAGER .end (output );
389
407
}
390
408
391
409
static DataChecksum createChecksum (DFSClient client ) {
@@ -599,20 +617,19 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
599
617
LOG .debug ("When create output stream for {}, exclude list is {}, retry={}" , src ,
600
618
getDataNodeInfo (toExcludeNodes ), retry );
601
619
}
620
+ EnumSetWritable <CreateFlag > createFlags = getCreateFlags (overwrite , noLocalWrite );
602
621
HdfsFileStatus stat ;
603
622
try {
604
623
stat = FILE_CREATOR .create (namenode , src ,
605
624
FsPermission .getFileDefault ().applyUMask (FsPermission .getUMask (conf )), clientName ,
606
- getCreateFlags (overwrite , noLocalWrite ), createParent , replication , blockSize ,
607
- CryptoProtocolVersion .supported ());
625
+ createFlags , createParent , replication , blockSize , CryptoProtocolVersion .supported ());
608
626
} catch (Exception e ) {
609
627
if (e instanceof RemoteException ) {
610
628
throw (RemoteException ) e ;
611
629
} else {
612
630
throw new NameNodeException (e );
613
631
}
614
632
}
615
- beginFileLease (client , stat );
616
633
boolean succ = false ;
617
634
LocatedBlock locatedBlock = null ;
618
635
List <Future <Channel >> futureList = null ;
@@ -637,7 +654,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
637
654
Encryptor encryptor = createEncryptor (conf , stat , client );
638
655
FanOutOneBlockAsyncDFSOutput output =
639
656
new FanOutOneBlockAsyncDFSOutput (conf , dfs , client , namenode , clientName , src , stat ,
640
- locatedBlock , encryptor , datanodes , summer , ALLOC , monitor );
657
+ createFlags .get (), locatedBlock , encryptor , datanodes , summer , ALLOC , monitor );
658
+ beginFileLease (output );
641
659
succ = true ;
642
660
return output ;
643
661
} catch (RemoteException e ) {
@@ -676,7 +694,6 @@ public void operationComplete(Future<Channel> future) throws Exception {
676
694
});
677
695
}
678
696
}
679
- endFileLease (client , stat );
680
697
}
681
698
}
682
699
}
@@ -713,13 +730,14 @@ public static boolean shouldRetryCreate(RemoteException e) {
713
730
return e .getClassName ().endsWith ("RetryStartFileException" );
714
731
}
715
732
716
- static void completeFile (DFSClient client , ClientProtocol namenode , String src , String clientName ,
717
- ExtendedBlock block , HdfsFileStatus stat ) throws IOException {
733
+ static void completeFile (FanOutOneBlockAsyncDFSOutput output , DFSClient client ,
734
+ ClientProtocol namenode , String src , String clientName , ExtendedBlock block ,
735
+ HdfsFileStatus stat ) throws IOException {
718
736
int maxRetries = client .getConf ().getNumBlockWriteLocateFollowingRetry ();
719
737
for (int retry = 0 ; retry < maxRetries ; retry ++) {
720
738
try {
721
739
if (namenode .complete (src , clientName , block , stat .getFileId ())) {
722
- endFileLease (client , stat );
740
+ endFileLease (output );
723
741
return ;
724
742
} else {
725
743
LOG .warn ("complete file " + src + " not finished, retry = " + retry );
@@ -749,4 +767,10 @@ public static String getDataNodeInfo(Collection<DatanodeInfo> datanodeInfos) {
749
767
.append (datanodeInfo .getInfoPort ()).append (")" ).toString ())
750
768
.collect (Collectors .joining ("," , "[" , "]" ));
751
769
}
770
+
771
+ static DFSOutputStream createDummyDFSOutputStream (AsyncFSOutput output , DFSClient dfsClient ,
772
+ String src , HdfsFileStatus stat , EnumSet <CreateFlag > flag , DataChecksum checksum ) {
773
+ return DUMMY_DFS_OUTPUT_STREAM_CREATOR .createDummyDFSOutputStream (output , dfsClient , src , stat ,
774
+ flag , checksum );
775
+ }
752
776
}
0 commit comments