Skip to content

Commit 37c6ccc

Browse files
authored
feat(snowflake-driver): support azure exports buckets (#8730)
* add azure export bucket options to Snowflake driver * add env var for Azure SAS Token * add STORAGE_INTEGRATION variant to export bucket flows * Simplify isUnloadSupported() * add extractFilesFromGCS() to BaseDriver Class * implement extractFilesFromAzure() in SnowFlake Driver * some refactoring in extractFilesFromAzure() * debug and fix extractFilesFromAzure() * remove error if no csv files were exported (that's ok) * move extractFilesFromAzure to BaseDriver from Snowflake * switch Athena Driver to use extractUnloadedFilesFromS3() from BaseDvier impl * switch RedShift Driver to use extractUnloadedFilesFromS3() from BaseDvier impl * switch Databricks Driver to use extractUnloadedFilesFromS3() and extractFilesFromAzure() from BaseDvier impl * Remove unused/cleanup from databricks driver * update extractFilesFromGCS() in BaseDriver to receive credentials instead of Storage instance * remove dep on @google-cloud/storage in Databricks for export bucket flow * fix Databricks driver export bucket flows after refactoring * set up CI to test clickhouse with s3 export bucket * Rename databricks export bucket test to reflect the s3 type * fix clickhouse-export-bucket-s3-full script in package.json * set up CI to test Snowflake with s3 export bucket * rename databricks tests snapshot after renaming test job * improve extractUnloadedFilesFromS3 flow to support different bucket naming schemes * add snowflake test snapshots for export bucket test * add clickhouse test snapshots for export bucket test * update databricks with bucket tests snapshots * rename athena test to reflect that it uses export bucket * reformat CLOUD_DATABASES in drivers tests CI job + add clickhouse-export-bucket-s3 * add snowflake export bucket to azure driver test * add databricks export bucket to azure driver test * add snowflake export bucket to azure via storage integration driver test * add snowflake export bucket to gcs via storage integration driver test * improve/refactor exported files filtering in extractFilesFromAzure() * set env secrets for drivers tests * Fix createExportBucket in Snowflake (making sasToken optional) * Fix CUBEJS_DB_EXPORT_BUCKET env var for tests * Remove includeIncrementalSchemaSuite from every test suite besides one + add comment * Fix CUBEJS_DB_EXPORT_BUCKET env var for tests in databricks * fix databricks export to azure flow + align test config * Fix blob object filtering during unloading in azure * sync @google-cloud/storage package used across packages * yarn lock sync * remove not needed includeIncrementalSchemaSuite in some tests * rename bigquery → bigquery-export-bucket-gcs to align with the rest tests * Add comments for exporting urls from S3 in BigQueryDriver
1 parent 9d5cfcc commit 37c6ccc

37 files changed

+110401
-2567
lines changed

.github/workflows/drivers-tests.yml

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,18 @@ jobs:
175175
needs: [latest-tag-sha, build]
176176
if: (needs['latest-tag-sha'].outputs.sha != github.sha)
177177
env:
178-
CLOUD_DATABASES: athena bigquery databricks-jdbc databricks-jdbc-export-bucket snowflake
178+
CLOUD_DATABASES: >
179+
athena-export-bucket-s3
180+
bigquery-export-bucket-gcs
181+
clickhouse-export-bucket-s3
182+
databricks-jdbc
183+
databricks-jdbc-export-bucket-s3
184+
databricks-jdbc-export-bucket-azure
185+
snowflake
186+
snowflake-export-bucket-s3
187+
snowflake-export-bucket-azure
188+
snowflake-export-bucket-azure-via-storage-integration
189+
snowflake-export-bucket-gcs
179190
# As per docs:
180191
# Secrets cannot be directly referenced in if: conditionals. Instead, consider setting
181192
# secrets as job-level environment variables, then referencing the environment variables
@@ -186,15 +197,21 @@ jobs:
186197
node:
187198
- 20.x
188199
database:
189-
- athena
190-
- bigquery
200+
- athena-export-bucket-s3
201+
- bigquery-export-bucket-gcs
191202
- clickhouse
203+
- clickhouse-export-bucket-s3
192204
- databricks-jdbc
193-
- databricks-jdbc-export-bucket
205+
- databricks-jdbc-export-bucket-s3
206+
- databricks-jdbc-export-bucket-azure
194207
- mssql
195208
- mysql
196209
- postgres
197210
- snowflake
211+
- snowflake-export-bucket-s3
212+
- snowflake-export-bucket-azure
213+
- snowflake-export-bucket-azure-via-storage-integration
214+
- snowflake-export-bucket-gcs
198215
fail-fast: false
199216

200217
steps:
@@ -258,6 +275,13 @@ jobs:
258275
# BigQuery
259276
DRIVERS_TESTS_CUBEJS_DB_BQ_CREDENTIALS: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_BQ_CREDENTIALS }}
260277

278+
#GCS
279+
DRIVERS_TESTS_CUBEJS_DB_EXPORT_GCS_CREDENTIALS: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_GCS_CREDENTIALS }}
280+
281+
# Azure
282+
DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_KEY: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_KEY }}
283+
DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_SAS_TOKEN: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_SAS_TOKEN }}
284+
261285
# Databricks
262286
DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL }}
263287
DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN }}

packages/cubejs-athena-driver/package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
"types": "dist/src/index.d.ts",
3030
"dependencies": {
3131
"@aws-sdk/client-athena": "^3.22.0",
32-
"@aws-sdk/client-s3": "^3.49.0",
33-
"@aws-sdk/s3-request-presigner": "^3.49.0",
3432
"@cubejs-backend/base-driver": "^0.36.0",
3533
"@cubejs-backend/shared": "^0.36.0",
3634
"sqlstring": "^2.3.1"

packages/cubejs-athena-driver/src/AthenaDriver.ts

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ import {
1717
ColumnInfo,
1818
StartQueryExecutionCommandInput,
1919
} from '@aws-sdk/client-athena';
20-
import { S3, GetObjectCommand } from '@aws-sdk/client-s3';
21-
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
2220
import * as stream from 'stream';
2321
import {
2422
BaseDriver,
@@ -126,7 +124,7 @@ export class AthenaDriver extends BaseDriver implements DriverInterface {
126124
getEnv('athenaAwsSecret', { dataSource });
127125

128126
const { schema, ...restConfig } = config;
129-
127+
130128
this.schema = schema ||
131129
getEnv('dbName', { dataSource }) ||
132130
getEnv('dbSchema', { dataSource });
@@ -438,31 +436,18 @@ export class AthenaDriver extends BaseDriver implements DriverInterface {
438436
* Returns an array of signed URLs of the unloaded csv files.
439437
*/
440438
private async getCsvFiles(tableName: string): Promise<string[]> {
441-
const client = new S3({
442-
credentials: this.config.credentials,
443-
region: this.config.region,
444-
});
445439
const { bucket, prefix } = AthenaDriver.splitS3Path(
446440
`${this.config.exportBucket}/${tableName}`
447441
);
448-
const list = await client.listObjectsV2({
449-
Bucket: bucket,
450-
Prefix: prefix.slice(1), // skip leading
451-
});
452-
if (!list.Contents) {
453-
return [];
454-
} else {
455-
const files = await Promise.all(
456-
list.Contents.map(async (file) => {
457-
const command = new GetObjectCommand({
458-
Bucket: bucket,
459-
Key: file.Key,
460-
});
461-
return getSignedUrl(client, command, { expiresIn: 3600 });
462-
})
463-
);
464-
return files;
465-
}
442+
443+
return this.extractUnloadedFilesFromS3(
444+
{
445+
credentials: this.config.credentials,
446+
region: this.config.region,
447+
},
448+
bucket,
449+
prefix.slice(1),
450+
);
466451
}
467452

468453
public informationSchemaQuery() {

packages/cubejs-backend-shared/src/env.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ const variables: Record<string, (...args: any) => any> = {
718718
),
719719

720720
/**
721-
* AWS Key for the AWS based export bucket srorage.
721+
* AWS Key for the AWS based export bucket storage.
722722
*/
723723
dbExportBucketAwsKey: ({
724724
dataSource,
@@ -731,7 +731,7 @@ const variables: Record<string, (...args: any) => any> = {
731731
),
732732

733733
/**
734-
* AWS Secret for the AWS based export bucket srorage.
734+
* AWS Secret for the AWS based export bucket storage.
735735
*/
736736
dbExportBucketAwsSecret: ({
737737
dataSource,
@@ -744,7 +744,7 @@ const variables: Record<string, (...args: any) => any> = {
744744
),
745745

746746
/**
747-
* AWS Region for the AWS based export bucket srorage.
747+
* AWS Region for the AWS based export bucket storage.
748748
*/
749749
dbExportBucketAwsRegion: ({
750750
dataSource,
@@ -757,7 +757,7 @@ const variables: Record<string, (...args: any) => any> = {
757757
),
758758

759759
/**
760-
* Azure Key for the Azure based export bucket srorage.
760+
* Azure Key for the Azure based export bucket storage.
761761
*/
762762
dbExportBucketAzureKey: ({
763763
dataSource,
@@ -769,6 +769,19 @@ const variables: Record<string, (...args: any) => any> = {
769769
]
770770
),
771771

772+
/**
773+
* Azure SAS Token for the Azure based export bucket storage.
774+
*/
775+
dbExportAzureSasToken: ({
776+
dataSource,
777+
}: {
778+
dataSource: string,
779+
}) => (
780+
process.env[
781+
keyByDataSource('CUBEJS_DB_EXPORT_BUCKET_AZURE_SAS_TOKEN', dataSource)
782+
]
783+
),
784+
772785
/**
773786
* Export bucket options for Integration based.
774787
*/

packages/cubejs-base-driver/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
"dependencies": {
3232
"@aws-sdk/client-s3": "^3.49.0",
3333
"@aws-sdk/s3-request-presigner": "^3.49.0",
34+
"@azure/storage-blob": "^12.9.0",
3435
"@cubejs-backend/shared": "^0.36.0",
36+
"@google-cloud/storage": "^7.13.0",
3537
"ramda": "^0.27.0"
3638
},
3739
"devDependencies": {

packages/cubejs-base-driver/src/BaseDriver.ts

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ import { reduce } from 'ramda';
1818
import fs from 'fs';
1919
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
2020
import { S3, GetObjectCommand, S3ClientConfig } from '@aws-sdk/client-s3';
21+
import { Storage } from '@google-cloud/storage';
22+
import {
23+
BlobServiceClient,
24+
StorageSharedKeyCredential,
25+
ContainerSASPermissions,
26+
SASProtocol,
27+
generateBlobSASQueryParameters,
28+
} from '@azure/storage-blob';
2129

2230
import { cancelCombinator } from './utils';
2331
import {
@@ -44,6 +52,15 @@ import {
4452
ForeignKeysQueryResult,
4553
} from './driver.interface';
4654

55+
export type AzureStorageClientConfig = {
56+
azureKey: string,
57+
sasToken?: string,
58+
};
59+
60+
export type GoogleStorageClientConfig = {
61+
credentials: any,
62+
};
63+
4764
const sortByKeys = (unordered: any) => {
4865
const ordered: any = {};
4966

@@ -467,7 +484,7 @@ export abstract class BaseDriver implements DriverInterface {
467484
const conditionString = conditions.join(' OR ');
468485

469486
const query = this.getColumnsForSpecificTablesQuery(conditionString);
470-
487+
471488
const [primaryKeys, foreignKeys] = await Promise.all([
472489
this.primaryKeys(conditionString, parameters),
473490
this.foreignKeys(conditionString, parameters)
@@ -648,6 +665,10 @@ export abstract class BaseDriver implements DriverInterface {
648665
prefix: string
649666
): Promise<string[]> {
650667
const storage = new S3(clientOptions);
668+
// It looks that different driver configurations use different formats
669+
// for the bucket - some expect only names, some - full url-like names.
670+
// So we unify this.
671+
bucketName = bucketName.replace(/^[a-zA-Z]+:\/\//, '');
651672

652673
const list = await storage.listObjectsV2({
653674
Bucket: bucketName,
@@ -672,4 +693,71 @@ export abstract class BaseDriver implements DriverInterface {
672693

673694
throw new Error('Unable to retrieve list of files from S3 storage after unloading.');
674695
}
696+
697+
/**
698+
* Returns an array of signed GCS URLs of the unloaded csv files.
699+
*/
700+
protected async extractFilesFromGCS(
701+
gcsConfig: GoogleStorageClientConfig,
702+
bucketName: string,
703+
tableName: string
704+
): Promise<string[]> {
705+
const storage = new Storage({
706+
credentials: gcsConfig.credentials,
707+
projectId: gcsConfig.credentials.project_id
708+
});
709+
const bucket = storage.bucket(bucketName);
710+
const [files] = await bucket.getFiles({ prefix: `${tableName}/` });
711+
if (files.length) {
712+
const csvFile = await Promise.all(files.map(async (file) => {
713+
const [url] = await file.getSignedUrl({
714+
action: 'read',
715+
expires: new Date(new Date().getTime() + 60 * 60 * 1000)
716+
});
717+
return url;
718+
}));
719+
return csvFile;
720+
} else {
721+
return [];
722+
}
723+
}
724+
725+
protected async extractFilesFromAzure(
726+
azureConfig: AzureStorageClientConfig,
727+
bucketName: string,
728+
tableName: string
729+
): Promise<string[]> {
730+
const parts = bucketName.split('.blob.core.windows.net/');
731+
const account = parts[0];
732+
const container = parts[1].split('/')[0];
733+
const credential = new StorageSharedKeyCredential(account, azureConfig.azureKey);
734+
const url = `https://${account}.blob.core.windows.net`;
735+
const blobServiceClient = azureConfig.sasToken ?
736+
new BlobServiceClient(`${url}?${azureConfig.sasToken}`) :
737+
new BlobServiceClient(url, credential);
738+
739+
const csvFiles: string[] = [];
740+
const containerClient = blobServiceClient.getContainerClient(container);
741+
const blobsList = containerClient.listBlobsFlat({ prefix: `${tableName}/` });
742+
for await (const blob of blobsList) {
743+
if (blob.name && (blob.name.endsWith('.csv.gz') || blob.name.endsWith('.csv'))) {
744+
const sas = generateBlobSASQueryParameters(
745+
{
746+
containerName: container,
747+
blobName: blob.name,
748+
permissions: ContainerSASPermissions.parse('r'),
749+
startsOn: new Date(new Date().valueOf()),
750+
expiresOn:
751+
new Date(new Date().valueOf() + 1000 * 60 * 60),
752+
protocol: SASProtocol.Https,
753+
version: '2020-08-04',
754+
},
755+
credential,
756+
).toString();
757+
csvFiles.push(`${url}/${container}/${blob.name}?${sas}`);
758+
}
759+
}
760+
761+
return csvFiles;
762+
}
675763
}

packages/cubejs-bigquery-driver/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
"@cubejs-backend/dotenv": "^9.0.2",
3333
"@cubejs-backend/shared": "^0.36.0",
3434
"@google-cloud/bigquery": "^7.7.0",
35-
"@google-cloud/storage": "^7.11.1",
35+
"@google-cloud/storage": "^7.13.0",
3636
"ramda": "^0.27.2"
3737
},
3838
"devDependencies": {

packages/cubejs-bigquery-driver/src/BigQueryDriver.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,11 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface {
325325
const bigQueryTable = this.bigquery.dataset(schema).table(tableName);
326326
const [job] = await bigQueryTable.createExtractJob(destination, { format: 'CSV', gzip: true });
327327
await this.waitForJobResult(job, { table }, false);
328+
// There is an implementation for extracting and signing urls from S3
329+
// @see BaseDriver->extractUnloadedFilesFromS3()
330+
// Please use that if you need. Here is a different flow
331+
// because bigquery requires storage/bucket object for other things,
332+
// and there is no need to initiate another one (created in extractUnloadedFilesFromS3()).
328333
const [files] = await this.bucket.getFiles({ prefix: `${table}-` });
329334
const urls = await Promise.all(files.map(async file => {
330335
const [url] = await file.getSignedUrl({

packages/cubejs-databricks-jdbc-driver/package.json

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
"bin"
2929
],
3030
"dependencies": {
31-
"@aws-sdk/client-s3": "^3.49.0",
32-
"@aws-sdk/s3-request-presigner": "^3.49.0",
33-
"@azure/storage-blob": "^12.9.0",
3431
"@cubejs-backend/base-driver": "^0.36.0",
3532
"@cubejs-backend/jdbc-driver": "^0.36.0",
3633
"@cubejs-backend/schema-compiler": "^0.36.4",

0 commit comments

Comments
 (0)