Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pinot-plugins/pinot-file-system/pinot-adls/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-data-lake-store-sdk</artifactId>
<version>2.3.9</version>
<version>2.3.10</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import com.azure.core.util.Context;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.Utility;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
Expand Down Expand Up @@ -76,7 +74,7 @@ public class ADLSGen2PinotFS extends BasePinotFS {
private static final Logger LOGGER = LoggerFactory.getLogger(ADLSGen2PinotFS.class);

private enum AuthenticationType {
ACCESS_KEY, AZURE_AD, AZURE_AD_WITH_PROXY
ACCESS_KEY, AZURE_AD, AZURE_AD_WITH_PROXY, ANONYMOUS_ACCESS
}

private static final String AUTHENTICATION_TYPE = "authenticationType";
Expand All @@ -87,6 +85,8 @@ private enum AuthenticationType {
private static final String CLIENT_ID = "clientId";
private static final String CLIENT_SECRET = "clientSecret";
private static final String TENANT_ID = "tenantId";
private static final String MANAGED_IDENTITY_CLIENT_ID = "managedIdentityClientId";
private static final String AUTHORITY_HOST = "authorityHost";
private static final String PROXY_HOST = "proxyHost";
private static final String PROXY_PORT = "proxyPort";
private static final String PROXY_USERNAME = "proxyUsername";
Expand All @@ -107,7 +107,6 @@ private enum AuthenticationType {
private static final int BUFFER_SIZE = 4 * 1024 * 1024;

private DataLakeFileSystemClient _fileSystemClient;
private BlobServiceClient _blobServiceClient;

// If enabled, pinotFS implementation will guarantee that the bits you've read are the same as the ones you wrote.
// However, there's some overhead in computing hash. (Adds roughly 3 seconds for 1GB file)
Expand All @@ -116,9 +115,8 @@ private enum AuthenticationType {
public ADLSGen2PinotFS() {
}

public ADLSGen2PinotFS(DataLakeFileSystemClient fileSystemClient, BlobServiceClient blobServiceClient) {
public ADLSGen2PinotFS(DataLakeFileSystemClient fileSystemClient) {
_fileSystemClient = fileSystemClient;
_blobServiceClient = blobServiceClient;
}

@Override
Expand All @@ -136,17 +134,17 @@ public void init(PinotConfiguration config) {
String clientId = config.getProperty(CLIENT_ID);
String clientSecret = config.getProperty(CLIENT_SECRET);
String tenantId = config.getProperty(TENANT_ID);
String managedIdentityClientId = config.getProperty(MANAGED_IDENTITY_CLIENT_ID);
String authorityHost = config.getProperty(AUTHORITY_HOST);
String proxyHost = config.getProperty(PROXY_HOST);
String proxyUsername = config.getProperty(PROXY_USERNAME);
String proxyPassword = config.getProperty(PROXY_PASSWORD);
String proxyPort = config.getProperty(PROXY_PORT);

String dfsServiceEndpointUrl = HTTPS_URL_PREFIX + accountName + AZURE_STORAGE_DNS_SUFFIX;
String blobServiceEndpointUrl = HTTPS_URL_PREFIX + accountName + AZURE_BLOB_DNS_SUFFIX;

DataLakeServiceClientBuilder dataLakeServiceClientBuilder =
new DataLakeServiceClientBuilder().endpoint(dfsServiceEndpointUrl);
BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder().endpoint(blobServiceEndpointUrl);

switch (authType) {
case ACCESS_KEY: {
Expand All @@ -156,7 +154,6 @@ public void init(PinotConfiguration config) {

StorageSharedKeyCredential sharedKeyCredential = new StorageSharedKeyCredential(accountName, accessKey);
dataLakeServiceClientBuilder.credential(sharedKeyCredential);
blobServiceClientBuilder.credential(sharedKeyCredential);
break;
}
case AZURE_AD: {
Expand All @@ -169,7 +166,6 @@ public void init(PinotConfiguration config) {
new ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId)
.build();
dataLakeServiceClientBuilder.credential(clientSecretCredential);
blobServiceClientBuilder.credential(clientSecretCredential);
break;
}
case AZURE_AD_WITH_PROXY: {
Expand All @@ -191,20 +187,37 @@ public void init(PinotConfiguration config) {
clientSecretCredentialBuilder.httpClient(builder.build());

dataLakeServiceClientBuilder.credential(clientSecretCredentialBuilder.build());
blobServiceClientBuilder.credential(clientSecretCredentialBuilder.build());
break;
}
default:
throw new IllegalStateException("Expecting valid authType. One of (ACCESS_KEY, AZURE_AD, AZURE_AD_WITH_PROXY");
case ANONYMOUS_ACCESS: {
LOGGER.info("Authenticating using anonymous access");
break;
}
default: {
LOGGER.info("Authenticating using Azure default credential");
DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
if (tenantId != null) {
LOGGER.info("Set tenant ID to {}", tenantId);
defaultAzureCredentialBuilder.tenantId(tenantId);
}
if (managedIdentityClientId != null) {
LOGGER.info("Set managed identity client ID to {}", managedIdentityClientId);
defaultAzureCredentialBuilder.managedIdentityClientId(managedIdentityClientId);
}
if (authorityHost != null) {
LOGGER.info("Set authority host to {}", authorityHost);
defaultAzureCredentialBuilder.authorityHost(authorityHost);
}
dataLakeServiceClientBuilder.credential(defaultAzureCredentialBuilder.build());
break;
}
}

_blobServiceClient = blobServiceClientBuilder.buildClient();
DataLakeServiceClient serviceClient = dataLakeServiceClientBuilder.buildClient();
_fileSystemClient = getOrCreateClientWithFileSystem(serviceClient, fileSystemName);

LOGGER.info("ADLSGen2PinotFS is initialized (accountName={}, fileSystemName={}, dfsServiceEndpointUrl={}, "
+ "blobServiceEndpointUrl={}, enableChecksum={})", accountName, fileSystemName, dfsServiceEndpointUrl,
blobServiceEndpointUrl, _enableChecksum);
+ "enableChecksum={})", accountName, fileSystemName, dfsServiceEndpointUrl, _enableChecksum);
}

/**
Expand Down Expand Up @@ -586,15 +599,8 @@ public boolean touch(URI uri)
@Override
public InputStream open(URI uri)
throws IOException {
// Use Blob API since read() function from Data Lake Client currently takes "OutputStream" as an input and
// flush bytes to an output stream. This needs to be piped back into input stream to implement this function.
// On the other hand, Blob API directly allow you to open the input stream.
BlobClient blobClient = _blobServiceClient.getBlobContainerClient(_fileSystemClient.getFileSystemName())
.getBlobClient(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(uri));

return blobClient.openInputStream();
// Another approach is to download the file to the local disk to a temp path and return the file input stream. In
// this case, we need to override "close()" and delete temp file.
return _fileSystemClient.getFileClient(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(uri)).openInputStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing this!

.getInputStream();
}

private boolean copySrcToDst(URI srcUri, URI dstUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.specialized.BlobInputStream;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.PathItem;
import com.azure.storage.file.datalake.models.PathProperties;
Expand All @@ -39,10 +36,11 @@
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS;
import org.apache.pinot.plugin.filesystem.AzurePinotFSUtil;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.FileMetadata;
import org.mockito.Mock;
Expand All @@ -65,18 +63,14 @@ public class ADLSGen2PinotFSTest {
@Mock
private DataLakeDirectoryClient _mockDirectoryClient;
@Mock
private BlobServiceClient _mockBlobServiceClient;
@Mock
private BlobContainerClient _mockBlobContainerClient;
@Mock
private BlobClient _mockBlobClient;
@Mock
private BlobInputStream _mockBlobInputStream;
@Mock
private DataLakeServiceClient _mockServiceClient;
@Mock
private DataLakeFileClient _mockFileClient;
@Mock
private DataLakeFileOpenInputStreamResult _mockFileOpenInputStreamResult;
@Mock
private InputStream _mockInputStream;
@Mock
private DataLakeStorageException _mockDataLakeStorageException;
@Mock
private SimpleResponse _mockSimpleResponse;
Expand All @@ -95,16 +89,16 @@ public class ADLSGen2PinotFSTest {
@BeforeMethod
public void setup()
throws URISyntaxException {
MockitoAnnotations.initMocks(this);
_adlsGen2PinotFsUnderTest = new ADLSGen2PinotFS(_mockFileSystemClient, _mockBlobServiceClient);
MockitoAnnotations.openMocks(this);
_adlsGen2PinotFsUnderTest = new ADLSGen2PinotFS(_mockFileSystemClient);
_mockURI = new URI("mock://mock");
}

@AfterMethod
public void tearDown() {
verifyNoMoreInteractions(_mockDataLakeStorageException, _mockServiceClient, _mockFileSystemClient,
_mockSimpleResponse, _mockDirectoryClient, _mockPathItem, _mockPagedIterable, _mockPathProperties,
_mockFileClient, _mockBlobContainerClient, _mockBlobClient, _mockBlobServiceClient, _mockBlobInputStream);
_mockFileClient, _mockFileOpenInputStreamResult, _mockInputStream);
}

@Test(expectedExceptions = NullPointerException.class)
Expand Down Expand Up @@ -195,7 +189,7 @@ public void testIsDirectory()
public void testListFiles()
throws IOException {
when(_mockFileSystemClient.listPaths(any(), any())).thenReturn(_mockPagedIterable);
when(_mockPagedIterable.stream()).thenReturn(Collections.singletonList(_mockPathItem).stream());
when(_mockPagedIterable.stream()).thenReturn(Stream.of(_mockPathItem));
when(_mockPathItem.getName()).thenReturn("foo");

String[] actual = _adlsGen2PinotFsUnderTest.listFiles(_mockURI, true);
Expand All @@ -210,7 +204,7 @@ public void testListFiles()
public void testListFilesWithMetadata()
throws IOException {
when(_mockFileSystemClient.listPaths(any(), any())).thenReturn(_mockPagedIterable);
when(_mockPagedIterable.stream()).thenReturn(Collections.singletonList(_mockPathItem).stream());
when(_mockPagedIterable.stream()).thenReturn(Stream.of(_mockPathItem));
when(_mockPathItem.getName()).thenReturn("foo");
when(_mockPathItem.isDirectory()).thenReturn(false);
when(_mockPathItem.getContentLength()).thenReturn(1024L);
Expand Down Expand Up @@ -268,7 +262,7 @@ public void testDeleteDirectory()
when(_mockDirectoryClient.getProperties()).thenReturn(_mockPathProperties);
when(_mockPathProperties.getMetadata()).thenReturn(metadata);
when(_mockFileSystemClient.listPaths(any(), any())).thenReturn(_mockPagedIterable);
when(_mockPagedIterable.stream()).thenReturn(Collections.singletonList(_mockPathItem).stream());
when(_mockPagedIterable.stream()).thenReturn(Stream.of(_mockPathItem));
when(_mockPathItem.getName()).thenReturn("foo");
when(_mockFileSystemClient.deleteDirectoryWithResponse(eq(""), eq(true), eq(null), eq(null), eq(Context.NONE)))
.thenReturn(_mockSimpleResponse);
Expand Down Expand Up @@ -358,8 +352,7 @@ public void testExistsFalse()
}

@Test
public void testExistsException()
throws IOException {
public void testExistsException() {
when(_mockFileSystemClient.getDirectoryClient(any())).thenReturn(_mockDirectoryClient);
when(_mockDirectoryClient.getProperties()).thenThrow(_mockDataLakeStorageException);
when(_mockDataLakeStorageException.getStatusCode()).thenReturn(123);
Expand Down Expand Up @@ -441,17 +434,15 @@ public void testTouchException() {
@Test
public void open()
throws IOException {
when(_mockFileSystemClient.getFileSystemName()).thenReturn(MOCK_FILE_SYSTEM_NAME);
when(_mockBlobServiceClient.getBlobContainerClient(MOCK_FILE_SYSTEM_NAME)).thenReturn(_mockBlobContainerClient);
when(_mockBlobContainerClient.getBlobClient(any())).thenReturn(_mockBlobClient);
when(_mockBlobClient.openInputStream()).thenReturn(_mockBlobInputStream);
when(_mockFileSystemClient.getFileClient(any())).thenReturn(_mockFileClient);
when(_mockFileClient.openInputStream()).thenReturn(_mockFileOpenInputStreamResult);
when(_mockFileOpenInputStreamResult.getInputStream()).thenReturn(_mockInputStream);

InputStream actual = _adlsGen2PinotFsUnderTest.open(_mockURI);
Assert.assertEquals(actual, _mockBlobInputStream);
Assert.assertEquals(actual, _mockInputStream);

verify(_mockFileSystemClient).getFileSystemName();
verify(_mockBlobServiceClient).getBlobContainerClient(MOCK_FILE_SYSTEM_NAME);
verify(_mockBlobContainerClient).getBlobClient(any());
verify(_mockBlobClient).openInputStream();
verify(_mockFileSystemClient).getFileClient(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(_mockURI));
verify(_mockFileClient).openInputStream();
verify(_mockFileOpenInputStreamResult).getInputStream();
}
}