Skip to content

Commit

Permalink
Added support for HNS soft delete (#20353)
Browse files Browse the repository at this point in the history
  • Loading branch information
gapra-msft authored May 3, 2021
1 parent 04dfc9e commit 3a0556b
Show file tree
Hide file tree
Showing 59 changed files with 7,888 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ public static final class UrlConstants {
*/
public static final String VERSIONID_QUERY_PARAMETER = "versionid";

/**
* The deletionId parameters.
*/
public static final String DELETIONID_QUERY_PARAMETER = "deletionid";

/**
* The SAS service version parameter.
*/
Expand Down
3 changes: 3 additions & 0 deletions sdk/storage/azure-storage-file-datalake/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## 12.5.0-beta.4 (Unreleased)
- Added support for the 2020-08-04 service version.
- Added support to specify Parquet Input Serialization when querying a file.
- Added support to undelete a file or directory
- Added support to list deletedPaths
- Added support to get/set service properties

## 12.5.0-beta.3 (2021-04-16)
- Updated `azure-storage-blob` version to `12.11.0-beta.3` to pickup fixes for blob output stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.SasImplUtils;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.file.datalake.implementation.AzureDataLakeStorageRestAPIImpl;
import com.azure.storage.file.datalake.implementation.AzureDataLakeStorageRestAPIImplBuilder;
import com.azure.storage.file.datalake.implementation.models.FileSystemsListBlobHierarchySegmentResponse;
import com.azure.storage.file.datalake.implementation.models.FileSystemsListPathsResponse;
import com.azure.storage.file.datalake.implementation.models.ListBlobsShowOnly;
import com.azure.storage.file.datalake.implementation.models.PathResourceType;
import com.azure.storage.file.datalake.implementation.util.DataLakeImplUtils;
import com.azure.storage.file.datalake.implementation.util.DataLakeSasImplUtil;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeSignedIdentifier;
import com.azure.storage.file.datalake.models.FileSystemAccessPolicies;
import com.azure.storage.file.datalake.models.FileSystemProperties;
import com.azure.storage.file.datalake.models.ListPathsOptions;
import com.azure.storage.file.datalake.models.PathDeletedItem;
import com.azure.storage.file.datalake.models.PathHttpHeaders;
import com.azure.storage.file.datalake.models.PathItem;
import com.azure.storage.file.datalake.models.PublicAccessType;
Expand All @@ -46,9 +51,13 @@
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.pagedFluxError;
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.storage.common.Utility.STORAGE_TRACING_NAMESPACE_VALUE;

/**
* Client to a file system. It may only be instantiated through a {@link DataLakeFileSystemClientBuilder} or via the
Expand Down Expand Up @@ -83,6 +92,7 @@ public class DataLakeFileSystemAsyncClient {

private final ClientLogger logger = new ClientLogger(DataLakeFileSystemAsyncClient.class);
private final AzureDataLakeStorageRestAPIImpl azureDataLakeStorage;
private final AzureDataLakeStorageRestAPIImpl blobDataLakeStorageFs; // Just for list deleted paths
private final BlobContainerAsyncClient blobContainerAsyncClient;

private final String accountName;
Expand All @@ -107,6 +117,14 @@ public class DataLakeFileSystemAsyncClient {
.fileSystem(fileSystemName)
.version(serviceVersion.getVersion())
.buildClient();
String blobUrl = DataLakeImplUtils.endpointToDesiredEndpoint(url, "blob", "dfs");
this.blobDataLakeStorageFs = new AzureDataLakeStorageRestAPIImplBuilder()
.pipeline(pipeline)
.url(blobUrl)
.fileSystem(fileSystemName)
.version(serviceVersion.getVersion())
.buildClient();

this.serviceVersion = serviceVersion;

this.accountName = accountName;
Expand Down Expand Up @@ -456,8 +474,7 @@ public PagedFlux<PathItem> listPaths(ListPathsOptions options) {
PagedFlux<PathItem> listPathsWithOptionalTimeout(ListPathsOptions options,
Duration timeout) {
BiFunction<String, Integer, Mono<PagedResponse<PathItem>>> func =
(marker, pageSize) ->
{
(marker, pageSize) -> {
ListPathsOptions finalOptions;
if (pageSize != null) {
if (options == null) {
Expand Down Expand Up @@ -502,6 +519,89 @@ private Mono<FileSystemsListPathsResponse> listPathsSegment(String marker,
options.isUserPrincipalNameReturned(), Context.NONE), timeout);
}

/**
* Returns a reactive Publisher emitting all the files/directories in this filesystem that have been recently soft
* deleted lazily as needed. For more information, see the
* <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs">Azure Docs</a>.
*
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.file.datalake.DataLakeFileSystemAsyncClient.listDeletedPaths}
*
* @return A reactive response emitting the list of files/directories.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedFlux<PathDeletedItem> listDeletedPaths() {
try {
return this.listDeletedPaths(null);
} catch (RuntimeException ex) {
return pagedFluxError(logger, ex);
}
}

/**
* Returns a reactive Publisher emitting all the files/directories in this account lazily as needed. For more
* information, see the <a href="https://docs.microsoft.com/rest/api/storageservices/list-blobs">Azure Docs</a>.
*
* Note: You can specify the page size by using byPaged methods that accept an integer such as
* {@link PagedFlux#byPage(int)}. Please refer to the REST docs above for limitations on page size
*
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.file.datalake.DataLakeFileSystemAsyncClient.listDeletedPaths#String}
*
* @param prefix Specifies the path to filter the results to.
* @return A reactive response emitting the list of files/directories.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedFlux<PathDeletedItem> listDeletedPaths(String prefix) {
try {
return new PagedFlux<>(pageSize -> withContext(context -> listDeletedPaths(null, pageSize, prefix,
null, context)),
(marker, pageSize) -> withContext(context -> listDeletedPaths(marker, pageSize, prefix, null,
context)));
} catch (RuntimeException ex) {
return pagedFluxError(logger, ex);
}
}

PagedFlux<PathDeletedItem> listDeletedPathsWithOptionalTimeout(String prefix, Duration timeout, Context context) {
return new PagedFlux<>(pageSize -> listDeletedPaths(null, pageSize, prefix, timeout, context),
(marker, pageSize) -> listDeletedPaths(marker, pageSize, prefix, timeout, context));
}

private Mono<PagedResponse<PathDeletedItem>> listDeletedPaths(String marker, Integer pageSize,
String prefix, Duration timeout, Context context) {
return listDeletedPathsSegment(marker, prefix, pageSize, timeout, context)
.map(response -> {
List<PathDeletedItem> value = response.getValue().getSegment() == null
? Collections.emptyList()
: Stream.concat(
response.getValue().getSegment().getBlobItems().stream().map(Transforms::toPathDeletedItem),
response.getValue().getSegment().getBlobPrefixes().stream()
.map(Transforms::toPathDeletedItem)
).collect(Collectors.toList());
return new PagedResponseBase<>(
response.getRequest(),
response.getStatusCode(),
response.getHeaders(),
value,
response.getValue().getNextMarker(),
response.getDeserializedHeaders());
});
}

private Mono<FileSystemsListBlobHierarchySegmentResponse> listDeletedPathsSegment(String marker,
String prefix, Integer maxResults, Duration timeout, Context context) {
context = context == null ? Context.NONE : context;

return StorageImplUtils.applyOptionalTimeout(
this.blobDataLakeStorageFs.getFileSystems().listBlobHierarchySegmentWithResponseAsync(
prefix, null, marker, maxResults,
null, ListBlobsShowOnly.DELETED, null, null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE)), timeout);
}

/**
* Creates a new file within a file system. By default this method will not overwrite an existing file. For more
* information, see the <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/create">Azure Docs</a>.
Expand Down Expand Up @@ -744,6 +844,92 @@ public Mono<Response<Void>> deleteDirectoryWithResponse(String directoryName, bo
return getDirectoryAsyncClient(directoryName).deleteWithResponse(recursive, requestConditions);
}

/**
* Restores a soft deleted path in the file system. For more information see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/delete">Azure
* Docs</a>.
*
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.file.datalake.DataLakeFileSystemAsyncClient.undeletePath#String-String}
*
* @param deletedPath The deleted path
* @param deletionId deletion ID associated with the soft deleted path that uniquely identifies a resource if
* multiple have been soft deleted at this location.
* You can get soft deleted paths and their associated deletion IDs with {@link #listDeletedPaths()}.
* @return A reactive response signalling completion.
* @throws NullPointerException if deletedPath or deletionId is null.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<DataLakePathAsyncClient> undeletePath(String deletedPath, String deletionId) {
return undeletePathWithResponse(deletedPath, deletionId).flatMap(FluxUtil::toMono);
}

/**
* Restores a soft deleted path in the file system. For more information see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/delete">Azure
* Docs</a>.
*
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.file.datalake.DataLakeFileSystemAsyncClient.undeletePathWithResponse#String-String}
*
* @param deletedPath The deleted path
* @param deletionId deletion ID associated with the soft deleted path that uniquely identifies a resource if
* multiple have been soft deleted at this location.
* You can get soft deleted paths and their associated deletion IDs with {@link #listDeletedPaths()}.
* @return A reactive response signalling completion.
* @throws NullPointerException if deletedPath or deletionId is null.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<DataLakePathAsyncClient>> undeletePathWithResponse(String deletedPath, String deletionId) {
try {
return withContext(context -> undeletePathWithResponse(deletedPath, deletionId, context));
} catch (RuntimeException ex) {
return monoError(logger, ex);
}
}

Mono<Response<DataLakePathAsyncClient>> undeletePathWithResponse(String deletedPath, String deletionId,
Context context) {
Objects.requireNonNull(deletedPath);
Objects.requireNonNull(deletionId);

context = context == null ? Context.NONE : context;
String blobUrl = DataLakeImplUtils.endpointToDesiredEndpoint(blobDataLakeStorageFs.getUrl(), "blob", "dfs");

// This instance is to have a datalake impl that points to the blob endpoint
AzureDataLakeStorageRestAPIImpl blobDataLakeStoragePath = new AzureDataLakeStorageRestAPIImplBuilder()
.pipeline(blobDataLakeStorageFs.getHttpPipeline())
.url(blobUrl)
.fileSystem(blobDataLakeStorageFs.getFileSystem())
.path(Utility.urlDecode(deletedPath))
.version(serviceVersion.getVersion())
.buildClient();

// Initial rest call
return blobDataLakeStoragePath.getPaths().undeleteWithResponseAsync(null,
String.format("?%s=%s", Constants.UrlConstants.DELETIONID_QUERY_PARAMETER, deletionId), null,
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.onErrorMap(DataLakeImplUtils::transformBlobStorageException)
// Construct the new client and final response from the undelete + getProperties responses
.map(response -> {
DataLakePathAsyncClient client = new DataLakePathAsyncClient(getHttpPipeline(), getAccountUrl(),
serviceVersion, accountName, fileSystemName, deletedPath,
PathResourceType.fromString(response.getDeserializedHeaders().getXMsResourceType()),
blobContainerAsyncClient.getBlobAsyncClient(deletedPath, null)
.getBlockBlobAsyncClient());
if (PathResourceType.DIRECTORY.equals(client.pathResourceType)) {
return new SimpleResponse<>(response, new DataLakeDirectoryAsyncClient(client));
} else if (PathResourceType.FILE.equals(client.pathResourceType)) {
return new SimpleResponse<>(response, new DataLakeFileAsyncClient(client));
} else {
throw logger.logExceptionAsError(new IllegalStateException("'pathClient' expected to be either "
+ "a file or directory client."));
}
});
}

/**
* Sets the file system's permissions. The permissions indicate whether paths in a file system may be accessed
* publicly. Note that, for each signed identifier, we will truncate the start and expiry times to the nearest
Expand Down
Loading

0 comments on commit 3a0556b

Please sign in to comment.