131
131
import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .DATA_BLOCKS_BUFFER_DEFAULT ;
132
132
import static org .apache .hadoop .fs .azurebfs .constants .InternalConstants .CAPABILITY_SAFE_READAHEAD ;
133
133
import static org .apache .hadoop .fs .azurebfs .services .AbfsErrors .ERR_CREATE_ON_ROOT ;
134
+ import static org .apache .hadoop .fs .azurebfs .services .AbfsErrors .ERR_INVALID_ABFS_STATE ;
134
135
import static org .apache .hadoop .fs .azurebfs .services .AbfsErrors .UNAUTHORIZED_SAS ;
135
136
import static org .apache .hadoop .fs .impl .PathCapabilitiesSupport .validatePathCapabilityArgs ;
136
137
import static org .apache .hadoop .fs .statistics .IOStatisticsLogging .logIOStatisticsAtLevel ;
@@ -148,7 +149,11 @@ public class AzureBlobFileSystem extends FileSystem
148
149
private URI uri ;
149
150
private Path workingDir ;
150
151
private AzureBlobFileSystemStore abfsStore ;
151
- private boolean isClosed ;
152
+
153
+ /**
154
+ * Flag to indicate whether the file system is closed or not initiated.
155
+ */
156
+ private boolean isClosed = true ;
152
157
private final String fileSystemId = UUID .randomUUID ().toString ();
153
158
154
159
private boolean delegationTokenEnabled = false ;
@@ -311,6 +316,7 @@ public void initialize(URI uri, Configuration configuration)
311
316
}
312
317
313
318
rateLimiting = RateLimitingFactory .create (abfsConfiguration .getRateLimit ());
319
+ isClosed = false ;
314
320
LOG .debug ("Initializing AzureBlobFileSystem for {} complete" , uri );
315
321
}
316
322
@@ -328,8 +334,8 @@ public String toString() {
328
334
final StringBuilder sb = new StringBuilder (
329
335
"AzureBlobFileSystem{" );
330
336
sb .append ("uri=" ).append (fullPathUri );
331
- sb .append (", user='" ).append (abfsStore .getUser ()).append ('\'' );
332
- sb .append (", primaryUserGroup='" ).append (abfsStore .getPrimaryGroup ()).append ('\'' );
337
+ sb .append (", user='" ).append (getAbfsStore () .getUser ()).append ('\'' );
338
+ sb .append (", primaryUserGroup='" ).append (getAbfsStore () .getPrimaryGroup ()).append ('\'' );
333
339
sb .append ("[" + CAPABILITY_SAFE_READAHEAD + "]" );
334
340
sb .append ('}' );
335
341
return sb .toString ();
@@ -353,7 +359,7 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx
353
359
// bufferSize is unused.
354
360
LOG .debug (
355
361
"AzureBlobFileSystem.open path: {} bufferSize as configured in 'fs.azure.read.request.size': {}" ,
356
- path , abfsStore .getAbfsConfiguration ().getReadBufferSize ());
362
+ path , getAbfsStore () .getAbfsConfiguration ().getReadBufferSize ());
357
363
return open (path , Optional .empty ());
358
364
}
359
365
@@ -516,7 +522,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
516
522
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
517
523
fileSystemId , FSOperationType .APPEND , tracingHeaderFormat ,
518
524
listener );
519
- OutputStream outputStream = abfsStore
525
+ OutputStream outputStream = getAbfsStore ()
520
526
.openFileForWrite (qualifiedPath , statistics , false , tracingContext );
521
527
return new FSDataOutputStream (outputStream , statistics );
522
528
} catch (AzureBlobFileSystemException ex ) {
@@ -781,7 +787,7 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce
781
787
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
782
788
fileSystemId , FSOperationType .MKDIR , false , tracingHeaderFormat ,
783
789
listener );
784
- abfsStore .createDirectory (qualifiedPath ,
790
+ getAbfsStore () .createDirectory (qualifiedPath ,
785
791
permission == null ? FsPermission .getDirDefault () : permission ,
786
792
FsPermission .getUMask (getConf ()), tracingContext );
787
793
statIncrement (DIRECTORIES_CREATED );
@@ -794,10 +800,10 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce
794
800
795
801
@ Override
796
802
public synchronized void close () throws IOException {
797
- if (isClosed ) {
803
+ if (isClosed () ) {
798
804
return ;
799
805
}
800
- if (abfsStore .getClient ().isMetricCollectionEnabled ()) {
806
+ if (getAbfsStore () .getClient ().isMetricCollectionEnabled ()) {
801
807
TracingContext tracingMetricContext = new TracingContext (
802
808
clientCorrelationId ,
803
809
fileSystemId , FSOperationType .GET_ATTR , true ,
@@ -818,7 +824,7 @@ public synchronized void close() throws IOException {
818
824
IOSTATISTICS_LOGGING_LEVEL_DEFAULT );
819
825
logIOStatisticsAtLevel (LOG , iostatisticsLoggingLevel , getIOStatistics ());
820
826
}
821
- IOUtils .cleanupWithLogger (LOG , abfsStore , delegationTokenManager ,
827
+ IOUtils .cleanupWithLogger (LOG , getAbfsStore () , delegationTokenManager ,
822
828
getAbfsClient ());
823
829
this .isClosed = true ;
824
830
if (LOG .isDebugEnabled ()) {
@@ -865,7 +871,7 @@ public void breakLease(final Path f) throws IOException {
865
871
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
866
872
fileSystemId , FSOperationType .BREAK_LEASE , tracingHeaderFormat ,
867
873
listener );
868
- abfsStore .breakLease (qualifiedPath , tracingContext );
874
+ getAbfsStore () .breakLease (qualifiedPath , tracingContext );
869
875
} catch (AzureBlobFileSystemException ex ) {
870
876
checkException (f , ex );
871
877
}
@@ -883,6 +889,8 @@ public void breakLease(final Path f) throws IOException {
883
889
*/
884
890
@ Override
885
891
public Path makeQualified (Path path ) {
892
+ // Every API works on qualified paths. If store is null better to fail early.
893
+ Preconditions .checkState (getAbfsStore () != null );
886
894
// To support format: abfs://{dfs.nameservices}/file/path,
887
895
// path need to be first converted to URI, then get the raw path string,
888
896
// during which {dfs.nameservices} will be omitted.
@@ -916,7 +924,7 @@ public String getScheme() {
916
924
public Path getHomeDirectory () {
917
925
return makeQualified (new Path (
918
926
FileSystemConfigurations .USER_HOME_DIRECTORY_PREFIX
919
- + "/" + abfsStore .getUser ()));
927
+ + "/" + getAbfsStore () .getUser ()));
920
928
}
921
929
922
930
/**
@@ -938,7 +946,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file,
938
946
if (file .getLen () < start ) {
939
947
return new BlockLocation [0 ];
940
948
}
941
- final String blobLocationHost = abfsStore .getAbfsConfiguration ().getAzureBlockLocationHost ();
949
+ final String blobLocationHost = getAbfsStore () .getAbfsConfiguration ().getAzureBlockLocationHost ();
942
950
943
951
final String [] name = {blobLocationHost };
944
952
final String [] host = {blobLocationHost };
@@ -972,15 +980,15 @@ protected void finalize() throws Throwable {
972
980
* @return the short name of the user who instantiated the FS
973
981
*/
974
982
public String getOwnerUser () {
975
- return abfsStore .getUser ();
983
+ return getAbfsStore () .getUser ();
976
984
}
977
985
978
986
/**
979
987
* Get the group name of the owner of the FS.
980
988
* @return primary group name
981
989
*/
982
990
public String getOwnerUserPrimaryGroup () {
983
- return abfsStore .getPrimaryGroup ();
991
+ return getAbfsStore () .getPrimaryGroup ();
984
992
}
985
993
986
994
private boolean deleteRoot () throws IOException {
@@ -1052,7 +1060,7 @@ public void setOwner(final Path path, final String owner, final String group)
1052
1060
Path qualifiedPath = makeQualified (path );
1053
1061
1054
1062
try {
1055
- abfsStore .setOwner (qualifiedPath ,
1063
+ getAbfsStore () .setOwner (qualifiedPath ,
1056
1064
owner ,
1057
1065
group ,
1058
1066
tracingContext );
@@ -1089,15 +1097,15 @@ public void setXAttr(final Path path,
1089
1097
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1090
1098
fileSystemId , FSOperationType .SET_ATTR , true , tracingHeaderFormat ,
1091
1099
listener );
1092
- Hashtable <String , String > properties = abfsStore
1100
+ Hashtable <String , String > properties = getAbfsStore ()
1093
1101
.getPathStatus (qualifiedPath , tracingContext );
1094
1102
String xAttrName = ensureValidAttributeName (name );
1095
1103
boolean xAttrExists = properties .containsKey (xAttrName );
1096
1104
XAttrSetFlag .validate (name , xAttrExists , flag );
1097
1105
1098
- String xAttrValue = abfsStore .decodeAttribute (value );
1106
+ String xAttrValue = getAbfsStore () .decodeAttribute (value );
1099
1107
properties .put (xAttrName , xAttrValue );
1100
- abfsStore .setPathProperties (qualifiedPath , properties , tracingContext );
1108
+ getAbfsStore () .setPathProperties (qualifiedPath , properties , tracingContext );
1101
1109
} catch (AzureBlobFileSystemException ex ) {
1102
1110
checkException (path , ex );
1103
1111
}
@@ -1129,12 +1137,12 @@ public byte[] getXAttr(final Path path, final String name)
1129
1137
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1130
1138
fileSystemId , FSOperationType .GET_ATTR , true , tracingHeaderFormat ,
1131
1139
listener );
1132
- Hashtable <String , String > properties = abfsStore
1140
+ Hashtable <String , String > properties = getAbfsStore ()
1133
1141
.getPathStatus (qualifiedPath , tracingContext );
1134
1142
String xAttrName = ensureValidAttributeName (name );
1135
1143
if (properties .containsKey (xAttrName )) {
1136
1144
String xAttrValue = properties .get (xAttrName );
1137
- value = abfsStore .encodeAttribute (xAttrValue );
1145
+ value = getAbfsStore () .encodeAttribute (xAttrValue );
1138
1146
}
1139
1147
} catch (AzureBlobFileSystemException ex ) {
1140
1148
checkException (path , ex );
@@ -1172,7 +1180,7 @@ public void setPermission(final Path path, final FsPermission permission)
1172
1180
Path qualifiedPath = makeQualified (path );
1173
1181
1174
1182
try {
1175
- abfsStore .setPermission (qualifiedPath , permission , tracingContext );
1183
+ getAbfsStore () .setPermission (qualifiedPath , permission , tracingContext );
1176
1184
} catch (AzureBlobFileSystemException ex ) {
1177
1185
checkException (path , ex );
1178
1186
}
@@ -1209,7 +1217,7 @@ public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
1209
1217
Path qualifiedPath = makeQualified (path );
1210
1218
1211
1219
try {
1212
- abfsStore .modifyAclEntries (qualifiedPath , aclSpec , tracingContext );
1220
+ getAbfsStore () .modifyAclEntries (qualifiedPath , aclSpec , tracingContext );
1213
1221
} catch (AzureBlobFileSystemException ex ) {
1214
1222
checkException (path , ex );
1215
1223
}
@@ -1244,7 +1252,7 @@ public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
1244
1252
Path qualifiedPath = makeQualified (path );
1245
1253
1246
1254
try {
1247
- abfsStore .removeAclEntries (qualifiedPath , aclSpec , tracingContext );
1255
+ getAbfsStore () .removeAclEntries (qualifiedPath , aclSpec , tracingContext );
1248
1256
} catch (AzureBlobFileSystemException ex ) {
1249
1257
checkException (path , ex );
1250
1258
}
@@ -1272,7 +1280,7 @@ public void removeDefaultAcl(final Path path) throws IOException {
1272
1280
Path qualifiedPath = makeQualified (path );
1273
1281
1274
1282
try {
1275
- abfsStore .removeDefaultAcl (qualifiedPath , tracingContext );
1283
+ getAbfsStore () .removeDefaultAcl (qualifiedPath , tracingContext );
1276
1284
} catch (AzureBlobFileSystemException ex ) {
1277
1285
checkException (path , ex );
1278
1286
}
@@ -1302,7 +1310,7 @@ public void removeAcl(final Path path) throws IOException {
1302
1310
Path qualifiedPath = makeQualified (path );
1303
1311
1304
1312
try {
1305
- abfsStore .removeAcl (qualifiedPath , tracingContext );
1313
+ getAbfsStore () .removeAcl (qualifiedPath , tracingContext );
1306
1314
} catch (AzureBlobFileSystemException ex ) {
1307
1315
checkException (path , ex );
1308
1316
}
@@ -1339,7 +1347,7 @@ public void setAcl(final Path path, final List<AclEntry> aclSpec)
1339
1347
Path qualifiedPath = makeQualified (path );
1340
1348
1341
1349
try {
1342
- abfsStore .setAcl (qualifiedPath , aclSpec , tracingContext );
1350
+ getAbfsStore () .setAcl (qualifiedPath , aclSpec , tracingContext );
1343
1351
} catch (AzureBlobFileSystemException ex ) {
1344
1352
checkException (path , ex );
1345
1353
}
@@ -1367,7 +1375,7 @@ public AclStatus getAclStatus(final Path path) throws IOException {
1367
1375
Path qualifiedPath = makeQualified (path );
1368
1376
1369
1377
try {
1370
- return abfsStore .getAclStatus (qualifiedPath , tracingContext );
1378
+ return getAbfsStore () .getAclStatus (qualifiedPath , tracingContext );
1371
1379
} catch (AzureBlobFileSystemException ex ) {
1372
1380
checkException (path , ex );
1373
1381
return null ;
@@ -1394,7 +1402,7 @@ public void access(final Path path, final FsAction mode) throws IOException {
1394
1402
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1395
1403
fileSystemId , FSOperationType .ACCESS , tracingHeaderFormat ,
1396
1404
listener );
1397
- this .abfsStore .access (qualifiedPath , mode , tracingContext );
1405
+ this .getAbfsStore () .access (qualifiedPath , mode , tracingContext );
1398
1406
} catch (AzureBlobFileSystemException ex ) {
1399
1407
checkCheckAccessException (path , ex );
1400
1408
}
@@ -1416,11 +1424,11 @@ public boolean exists(Path f) throws IOException {
1416
1424
public RemoteIterator <FileStatus > listStatusIterator (Path path )
1417
1425
throws IOException {
1418
1426
LOG .debug ("AzureBlobFileSystem.listStatusIterator path : {}" , path );
1419
- if (abfsStore .getAbfsConfiguration ().enableAbfsListIterator ()) {
1427
+ if (getAbfsStore () .getAbfsConfiguration ().enableAbfsListIterator ()) {
1420
1428
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1421
1429
fileSystemId , FSOperationType .LISTSTATUS , true , tracingHeaderFormat , listener );
1422
1430
AbfsListStatusRemoteIterator abfsLsItr =
1423
- new AbfsListStatusRemoteIterator (path , abfsStore ,
1431
+ new AbfsListStatusRemoteIterator (path , getAbfsStore () ,
1424
1432
tracingContext );
1425
1433
return RemoteIterators .typeCastingRemoteIterator (abfsLsItr );
1426
1434
} else {
@@ -1502,7 +1510,7 @@ private boolean fileSystemExists() throws IOException {
1502
1510
try {
1503
1511
TracingContext tracingContext = new TracingContext (clientCorrelationId ,
1504
1512
fileSystemId , FSOperationType .TEST_OP , tracingHeaderFormat , listener );
1505
- abfsStore .getFilesystemProperties (tracingContext );
1513
+ getAbfsStore () .getFilesystemProperties (tracingContext );
1506
1514
} catch (AzureBlobFileSystemException ex ) {
1507
1515
try {
1508
1516
checkException (null , ex );
@@ -1521,7 +1529,7 @@ private void createFileSystem(TracingContext tracingContext) throws IOException
1521
1529
LOG .debug (
1522
1530
"AzureBlobFileSystem.createFileSystem uri: {}" , uri );
1523
1531
try {
1524
- abfsStore .createFilesystem (tracingContext );
1532
+ getAbfsStore () .createFilesystem (tracingContext );
1525
1533
} catch (AzureBlobFileSystemException ex ) {
1526
1534
checkException (null , ex );
1527
1535
}
@@ -1744,14 +1752,21 @@ public boolean failed() {
1744
1752
1745
1753
@ VisibleForTesting
1746
1754
public AzureBlobFileSystemStore getAbfsStore () {
1755
+ if (abfsStore == null ) {
1756
+ throw new IllegalStateException (ERR_INVALID_ABFS_STATE );
1757
+ }
1747
1758
return abfsStore ;
1748
1759
}
1749
1760
1750
1761
@ VisibleForTesting
1751
1762
AbfsClient getAbfsClient () {
1752
- return abfsStore .getClient ();
1763
+ return getAbfsStore () .getClient ();
1753
1764
}
1754
1765
1766
+ @ VisibleForTesting
1767
+ boolean isClosed () {
1768
+ return isClosed ;
1769
+ }
1755
1770
/**
1756
1771
* Get any Delegation Token manager created by the filesystem.
1757
1772
* @return the DT manager or null.
0 commit comments