Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 106 additions & 18 deletions controlplane/src/core/blobstorage/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,49 @@ import {
} from '@aws-sdk/client-s3';
import { BlobNotFoundError, BlobObject, type BlobStorage } from './index.js';

const maxConcurrency = 10; // Maximum number of concurrent operations

/**
* Configuration options for S3BlobStorage
*/
export interface S3BlobStorageConfig {
/**
* Use individual delete operations instead of bulk delete.
* Set to true for GCS compatibility, false for better S3 performance.
* @default false
*/
useIndividualDeletes?: boolean;
}

/**
* Stores objects in S3 given an S3Client and a bucket name
*/
export class S3BlobStorage implements BlobStorage {
private readonly useIndividualDeletes: boolean;

constructor(
private s3Client: S3Client,
private bucketName: string,
) {}
config: S3BlobStorageConfig = {},
) {
this.useIndividualDeletes = config.useIndividualDeletes ?? false;
}

/**
* Execute promises with limited concurrency and delays between batches
* Retries are handled by AWS SDK internally using exponential backoff. Default 3 retries.
*/
private async executeWithConcurrency<T>(tasks: (() => Promise<T>)[], concurrency: number): Promise<T[]> {
const results: T[] = [];

for (let i = 0; i < tasks.length; i += concurrency) {
const batch = tasks.slice(i, i + concurrency);
const batchResults = await Promise.all(batch.map((task) => task()));
results.push(...batchResults);
}

return results;
}

async putObject<Metadata extends Record<string, string>>({
key,
Expand Down Expand Up @@ -88,28 +123,81 @@ export class S3BlobStorage implements BlobStorage {
}
}

async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
const listCommand = new ListObjectsV2Command({
/**
* Delete objects using bulk DeleteObjectsCommand (efficient for S3)
*/
private async deleteObjectsBulk(objects: { Key?: string }[], abortSignal?: AbortSignal): Promise<number> {
const objectsToDelete = objects.filter((item) => item.Key).map((item) => ({ Key: item.Key! }));

if (objectsToDelete.length === 0) {
return 0;
}

const deleteCommand = new DeleteObjectsCommand({
Bucket: this.bucketName,
Prefix: data.key,
Delete: {
Objects: objectsToDelete,
Quiet: false,
},
});
const entries = await this.s3Client.send(listCommand, {
abortSignal: data.abortSignal,

const deleted = await this.s3Client.send(deleteCommand, { abortSignal });

if (deleted.Errors && deleted.Errors.length > 0) {
throw new Error(`Could not delete files: ${JSON.stringify(deleted.Errors)}`);
}

return deleted.Deleted?.length ?? 0;
}

/**
* Delete objects individually with limited concurrency (for GCS compatibility)
*/
private async deleteObjectsIndividually(objects: { Key?: string }[], abortSignal?: AbortSignal): Promise<number> {
const deleteTasks = objects.map((item) => async () => {
if (item.Key) {
const deleteCommand = new DeleteObjectCommand({
Bucket: this.bucketName,
Key: item.Key,
});
await this.s3Client.send(deleteCommand, { abortSignal });
return 1;
}
return 0;
});
const objectsToDelete = entries.Contents?.map((item) => ({ Key: item.Key }));
if (objectsToDelete && objectsToDelete.length > 0) {
const deleteCommand = new DeleteObjectsCommand({

const deletedCounts = await this.executeWithConcurrency(deleteTasks, maxConcurrency);
return deletedCounts.reduce((sum: number, count: number) => sum + count, 0);
}

async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
let totalDeleted = 0;
let continuationToken: string | undefined;

do {
const listCommand = new ListObjectsV2Command({
Bucket: this.bucketName,
Delete: {
Objects: objectsToDelete,
Quiet: false,
},
Prefix: data.key,
ContinuationToken: continuationToken,
});

const entries = await this.s3Client.send(listCommand, {
abortSignal: data.abortSignal,
});
const deleted = await this.s3Client.send(deleteCommand);
if (deleted.Errors) {
throw new Error(`could not delete files: ${deleted.Errors}`);

if (entries.Contents && entries.Contents.length > 0) {
if (this.useIndividualDeletes) {
// Use individual deletes for S3 implementation without DeleteObjectsCommand
totalDeleted += await this.deleteObjectsIndividually(entries.Contents, data.abortSignal);
} else {
// Use bulk delete for better S3 performance
totalDeleted += await this.deleteObjectsBulk(entries.Contents, data.abortSignal);
}
}
}
return objectsToDelete?.length ?? 0;

continuationToken = entries.IsTruncated ? entries.NextContinuationToken : undefined;
} while (continuationToken);

return totalDeleted;
}
}
11 changes: 9 additions & 2 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import { BillingRepository } from './repositories/BillingRepository.js';
import { BillingService } from './services/BillingService.js';
import { UserRepository } from './repositories/UserRepository.js';
import { AIGraphReadmeQueue, createAIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js';
import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName } from './util.js';
import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName, isGoogleCloudStorageUrl } from './util.js';
import { ApiKeyRepository } from './repositories/ApiKeyRepository.js';
import { createDeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js';
import {
Expand Down Expand Up @@ -106,6 +106,7 @@ export interface BuildConfig {
username?: string;
password?: string;
forcePathStyle?: boolean;
useIndividualDeletes?: boolean;
};
mailer: {
smtpEnabled: boolean;
Expand Down Expand Up @@ -310,7 +311,13 @@ export default async function build(opts: BuildConfig) {
const s3Config = createS3ClientConfig(bucketName, opts.s3Storage);

const s3Client = new S3Client(s3Config);
const blobStorage = new S3BlobStorage(s3Client, bucketName);
const blobStorage = new S3BlobStorage(s3Client, bucketName, {
// GCS does not support DeleteObjects; force individual deletes when detected.
useIndividualDeletes:
isGoogleCloudStorageUrl(opts.s3Storage.url) || isGoogleCloudStorageUrl(s3Config.endpoint as string)
? true
: opts.s3Storage.useIndividualDeletes ?? false,
});

const platformWebhooks = new PlatformWebhookService(opts.webhook?.url, opts.webhook?.key, logger);

Expand Down
7 changes: 7 additions & 0 deletions controlplane/src/core/env.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ export const envVariables = z
.string()
.transform((val) => val === 'true')
.default('true'),
/**
* Whether to use individual deletes for S3 objects instead of bulking them.
*/
S3_USE_INDIVIDUAL_DELETES: z
.string()
.transform((val) => val === 'true')
.optional(),
/**
* Email
*/
Expand Down
23 changes: 23 additions & 0 deletions controlplane/src/core/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,29 @@ export function webhookAxiosRetryCond(err: AxiosError) {
return isNetworkError(err) || isRetryableError(err);
}

/**
* Determines whether the given string is a Google Cloud Storage address by checking whether the hostname is
* `storage.googleapis.com` or the protocol is `gs:`.
*/
export function isGoogleCloudStorageUrl(s: string): boolean {
if (!s) {
return false;
}

try {
const url = new URL(s);
const hostname = url.hostname.toLowerCase();

return (
url.protocol === 'gs:' || hostname === 'storage.googleapis.com' || hostname.endsWith('.storage.googleapis.com')
);
} catch {
// ignore
}

return false;
}

export function createS3ClientConfig(bucketName: string, opts: S3StorageOptions): S3ClientConfig {
const url = new URL(opts.url);
const { region, username, password } = opts;
Expand Down
2 changes: 2 additions & 0 deletions controlplane/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const {
S3_ACCESS_KEY_ID,
S3_SECRET_ACCESS_KEY,
S3_FORCE_PATH_STYLE,
S3_USE_INDIVIDUAL_DELETES,
SMTP_ENABLED,
SMTP_HOST,
SMTP_PORT,
Expand Down Expand Up @@ -128,6 +129,7 @@ const options: BuildConfig = {
username: S3_ACCESS_KEY_ID,
password: S3_SECRET_ACCESS_KEY,
forcePathStyle: S3_FORCE_PATH_STYLE,
useIndividualDeletes: S3_USE_INDIVIDUAL_DELETES,
},
mailer: {
smtpEnabled: SMTP_ENABLED,
Expand Down
22 changes: 21 additions & 1 deletion controlplane/test/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, test } from 'vitest';
import { isValidLabelMatchers, mergeUrls, normalizeLabelMatchers } from '../src/core/util.js';
import { isValidLabelMatchers, mergeUrls, normalizeLabelMatchers, isGoogleCloudStorageUrl } from '../src/core/util.js';

describe('Utils', () => {
test('isValidLabelMatchers', () => {
Expand Down Expand Up @@ -29,4 +29,24 @@ describe('Utils', () => {
expect(mergeUrls('http://example.com/auth', '/path')).toBe('http://example.com/auth/path');
expect(mergeUrls('http://example.com/auth/', '/path')).toBe('http://example.com/auth/path');
});

describe('isGoogleCloudStorageUrl', () => {
test('that true is returned when a valid Google Cloud Storage URL', () => {
expect(isGoogleCloudStorageUrl('https://storage.googleapis.com/')).toBe(true);
expect(isGoogleCloudStorageUrl('https://STORAGE.GOOGLEAPIS.COM')).toBe(true);
expect(isGoogleCloudStorageUrl('https://storage.googleapis.com/bucket-name')).toBe(true);
expect(isGoogleCloudStorageUrl('https://bucket-name.storage.googleapis.com/')).toBe(true);
});

test('that true is returned when an URL with the `gs` protocol', () => {
expect(isGoogleCloudStorageUrl('gs://bucket-name')).toBe(true);
});

test('that false is returned when the URL is not a valid Google Cloud Storage URL', () => {
expect(isGoogleCloudStorageUrl('http://minio/cosmo')).toBe(false);
expect(isGoogleCloudStorageUrl('https://bucket-name.s3.amazonaws.com/')).toBe(false);
expect(isGoogleCloudStorageUrl('https://bucket-name.s3.amazonaws.com')).toBe(false);
expect(isGoogleCloudStorageUrl('https://storage.googleapis.com.evil.com')).toBe(false);
});
});
});