From 7bd50cc3fb40bcbf7a65b1ac8b81ffd83c1aa5cb Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Sun, 12 Jan 2025 14:15:33 +0200 Subject: [PATCH] Add partitioning strategies for S3 storage source (#4805) This PR introduces configurable partitioning strategies for S3 input sources, enabling distributed job executions to efficiently process subsets of S3 objects. When a job is created with multiple executions (N > 1), each execution is assigned a unique partition index (0 to N-1) and will only process its designated subset of objects based on the configured partitioning strategy. ## Motivation - Enable parallel processing of large S3 datasets across multiple job executions - Allow users to control how objects are distributed based on their data organization patterns - Provide deterministic object distribution for reproducible results ## Features - Multiple partitioning strategies: - `none`: No partitioning, all objects available to all executions (default) - `object`: Partition by complete object key using consistent hashing - `regex`: Partition using regex pattern matches from object keys - `substring`: Partition based on a specific portion of object keys - `date`: Partition based on dates found in object keys - Hash-based partitioning using FNV-1a ensures: - Deterministic assignment of objects to partitions - Distribution based on the chosen strategy and input data patterns - Robust handling of edge cases: - Fallback to partition 0 for unmatched objects - Proper handling of directories and empty paths - Unicode support for substring partitioning ## Example Usage Basic object partitioning: ```yaml source: type: s3 params: bucket: mybucket key: data/* partition: type: object ``` Regex partitioning with capture groups: ```yaml source: type: s3 params: bucket: mybucket key: data/* partition: type: regex pattern: "data/(\d{4})/(\d{2})/.*\.csv" ``` Date-based partitioning: ```yaml source: type: s3 params: bucket: mybucket key: logs/* partition: type: date dateFormat: "2006-01-02" ``` ## Testing - Unit tests covering all partitioning strategies - Integration tests with actual S3 storage - Edge case handling and error scenarios - Distribution analysis with various input patterns ## Summary by CodeRabbit Based on the comprehensive summary of changes, here are the release notes: ## Release Notes - **New Features** - Added S3 Object Partitioning system with support for multiple partitioning strategies (Object, Regex, Substring, Date) - Enhanced storage and compute modules to support execution-level context - **Improvements** - Refined method signatures across multiple packages to include execution context - Updated error handling and message formatting in various storage and compute modules - Improved flexibility in resource calculation and bidding strategies - **Bug Fixes** - Updated volume size calculation methods to handle more complex input scenarios - Enhanced validation for storage and partitioning configurations - **Documentation** - Added comprehensive documentation for S3 Object Partitioning system - Improved inline documentation for new features and method changes --- pkg/compute/bidder.go | 20 +- pkg/compute/capacity/calculators.go | 6 +- pkg/compute/capacity/disk/calculator.go | 7 +- pkg/compute/capacity/types.go | 2 +- pkg/s3/errors.go | 4 +- pkg/s3/partitioning.go | 299 +++++ pkg/s3/partitioning.md | 258 ++++ pkg/s3/partitioning_test.go | 1268 +++++++++++++++++++ pkg/s3/types.go | 17 +- pkg/storage/inline/storage.go | 2 +- pkg/storage/inline/storage_test.go | 4 +- pkg/storage/ipfs/storage.go | 2 +- pkg/storage/ipfs/storage_test.go | 4 +- pkg/storage/local_directory/storage.go | 2 +- pkg/storage/local_directory/storage_test.go | 2 +- pkg/storage/noop/noop.go | 2 +- pkg/storage/s3/storage.go | 70 +- pkg/storage/s3/storage_test.go | 214 +++- pkg/storage/s3/types.go | 13 +- pkg/storage/tracing/tracing.go | 4 +- pkg/storage/types.go | 2 +- pkg/storage/url/urldownload/storage.go | 2 +- pkg/storage/url/urldownload/storage_test.go | 4 +- 23 files changed, 2123 insertions(+), 85 deletions(-) create mode 100644 pkg/s3/partitioning.go create mode 100644 pkg/s3/partitioning.md create mode 100644 pkg/s3/partitioning_test.go diff --git a/pkg/compute/bidder.go b/pkg/compute/bidder.go index d7fb1de66a..588ba65a63 100644 --- a/pkg/compute/bidder.go +++ b/pkg/compute/bidder.go @@ -46,7 +46,7 @@ func NewBidder(params BidderParams) Bidder { } func (b Bidder) RunBidding(ctx context.Context, execution *models.Execution) error { - bidResult, err := b.doBidding(ctx, execution.Job) + bidResult, err := b.doBidding(ctx, execution) if err != nil { return b.handleError(ctx, execution, err) } @@ -62,11 +62,11 @@ func (b Bidder) RunBidding(ctx context.Context, execution *models.Execution) err // | true | true | true | // | true | false | false | // | false | N/A | false | -func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) { +func (b Bidder) doBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) { // NB(forrest): always run semantic bidding before resource bidding since generally there isn't much point in // calling resource strategies that require DiskUsageCalculator.Calculate (a precursor to checking bidding) if // semantically the job cannot run. - semanticResponse, err := b.runSemanticBidding(ctx, job) + semanticResponse, err := b.runSemanticBidding(ctx, execution) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyRes } // else the request is semantically biddable, calculate resource usage and check resource-based bidding. - resourceResponse, err := b.runResourceBidding(ctx, job) + resourceResponse, err := b.runResourceBidding(ctx, execution) if err != nil { return nil, err } @@ -85,10 +85,10 @@ func (b Bidder) doBidding(ctx context.Context, job *models.Job) (*bidStrategyRes return resourceResponse, nil } -func (b Bidder) runSemanticBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) { +func (b Bidder) runSemanticBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) { // ask the bidding strategy if we should bid on this job request := bidstrategy.BidStrategyRequest{ - Job: *job, + Job: *execution.Job, } // assume we are bidding unless a request is rejected @@ -125,21 +125,21 @@ func (b Bidder) runSemanticBidding(ctx context.Context, job *models.Job) (*bidSt }, nil } -func (b Bidder) runResourceBidding(ctx context.Context, job *models.Job) (*bidStrategyResponse, error) { +func (b Bidder) runResourceBidding(ctx context.Context, execution *models.Execution) (*bidStrategyResponse, error) { // parse job resource config - parsedUsage, err := job.Task().ResourcesConfig.ToResources() + parsedUsage, err := execution.Job.Task().ResourcesConfig.ToResources() if err != nil { return nil, fmt.Errorf("parsing job resource config: %w", err) } // calculate resource usage of the job, failure here represents a compute failure. - resourceUsage, err := b.usageCalculator.Calculate(ctx, *job, *parsedUsage) + resourceUsage, err := b.usageCalculator.Calculate(ctx, execution, *parsedUsage) if err != nil { return nil, fmt.Errorf("calculating resource usage of job: %w", err) } // ask the bidding strategy if we should bid on this job request := bidstrategy.BidStrategyRequest{ - Job: *job, + Job: *execution.Job, } // assume we are bidding unless a request is rejected diff --git a/pkg/compute/capacity/calculators.go b/pkg/compute/capacity/calculators.go index e09116a907..949f3bce1a 100644 --- a/pkg/compute/capacity/calculators.go +++ b/pkg/compute/capacity/calculators.go @@ -21,7 +21,7 @@ func NewDefaultsUsageCalculator(params DefaultsUsageCalculatorParams) *DefaultsU } func (c *DefaultsUsageCalculator) Calculate( - ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) { + ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) { return parsedUsage.Merge(c.defaults), nil } @@ -40,10 +40,10 @@ func NewChainedUsageCalculator(params ChainedUsageCalculatorParams) *ChainedUsag } func (c *ChainedUsageCalculator) Calculate( - ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) { + ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) { aggregatedUsage := &parsedUsage for _, calculator := range c.calculators { - calculatedUsage, err := calculator.Calculate(ctx, job, parsedUsage) + calculatedUsage, err := calculator.Calculate(ctx, execution, parsedUsage) if err != nil { return nil, err } diff --git a/pkg/compute/capacity/disk/calculator.go b/pkg/compute/capacity/disk/calculator.go index 70afa3ce22..c152995c3c 100644 --- a/pkg/compute/capacity/disk/calculator.go +++ b/pkg/compute/capacity/disk/calculator.go @@ -23,16 +23,17 @@ func NewDiskUsageCalculator(params DiskUsageCalculatorParams) *DiskUsageCalculat } } -func (c *DiskUsageCalculator) Calculate(ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) { +func (c *DiskUsageCalculator) Calculate( + ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) { requirements := &models.Resources{} var totalDiskRequirements uint64 = 0 - for _, input := range job.Task().InputSources { + for _, input := range execution.Job.Task().InputSources { strg, err := c.storages.Get(ctx, input.Source.Type) if err != nil { return nil, err } - volumeSize, err := strg.GetVolumeSize(ctx, *input) + volumeSize, err := strg.GetVolumeSize(ctx, execution, *input) if err != nil { return nil, bacerrors.Wrap(err, "error getting job disk space requirements") } diff --git a/pkg/compute/capacity/types.go b/pkg/compute/capacity/types.go index 530e104177..d92c3ce617 100644 --- a/pkg/compute/capacity/types.go +++ b/pkg/compute/capacity/types.go @@ -39,7 +39,7 @@ type UsageTracker interface { // UsageCalculator calculates the resource usage of a job. // Can also be used to populate the resource usage of a job with default values if not defined type UsageCalculator interface { - Calculate(ctx context.Context, job models.Job, parsedUsage models.Resources) (*models.Resources, error) + Calculate(ctx context.Context, execution *models.Execution, parsedUsage models.Resources) (*models.Resources, error) } // Provider returns the available capacity of a compute node. diff --git a/pkg/s3/errors.go b/pkg/s3/errors.go index fa9493571e..8d7ee3665a 100644 --- a/pkg/s3/errors.go +++ b/pkg/s3/errors.go @@ -28,8 +28,8 @@ func NewS3PublisherError(code bacerrors.ErrorCode, message string) bacerrors.Err WithComponent(PublisherComponent) } -func NewS3InputSourceError(code bacerrors.ErrorCode, message string) bacerrors.Error { - return bacerrors.New("%s", message). +func NewS3InputSourceError(code bacerrors.ErrorCode, format string, a ...any) bacerrors.Error { + return bacerrors.New(format, a...). WithCode(code). WithComponent(InputSourceComponent) } diff --git a/pkg/s3/partitioning.go b/pkg/s3/partitioning.go new file mode 100644 index 0000000000..3a3b8f0ab3 --- /dev/null +++ b/pkg/s3/partitioning.go @@ -0,0 +1,299 @@ +package s3 + +import ( + "hash/fnv" + "regexp" + "strings" + "time" +) + +const ( + // fallbackPartitionIndex We’ll assign any “unmatched” object to this partition index + fallbackPartitionIndex = 0 + + // captureGroupDelimiter is ASCII Unit Separator (0x1F). + // Used to join regex capture groups when computing partition hash. + // This character is not allowed in S3 object keys (which only permit ASCII 32-126, plus 128-255), + // making it a safe delimiter that won't conflict with key contents. + captureGroupDelimiter = "\x1F" +) + +// PartitionKeyType represents the type of partitioning to apply +type PartitionKeyType string + +const ( + PartitionKeyTypeNone PartitionKeyType = "none" + PartitionKeyTypeObject PartitionKeyType = "object" + PartitionKeyTypeRegex PartitionKeyType = "regex" + PartitionKeyTypeSubstring PartitionKeyType = "substring" + PartitionKeyTypeDate PartitionKeyType = "date" +) + +// PartitionConfig defines how to generate partition keys from object paths +type PartitionConfig struct { + Type PartitionKeyType + + // For regex partitioning + Pattern string + + // For substring partitioning + StartIndex int + EndIndex int + + // For date partitioning + DateFormat string +} + +func (c *PartitionConfig) Validate() error { + // First validate the partition type itself + switch c.Type { + case PartitionKeyTypeNone, PartitionKeyTypeObject, PartitionKeyTypeRegex, + PartitionKeyTypeSubstring, PartitionKeyTypeDate: + // Valid types + default: + if c.Type != "" { + return NewS3InputSourceError(BadRequestErrorCode, "unsupported partition key type %s", c.Type) + } + } + + // Then validate type-specific configurations + switch c.Type { + case PartitionKeyTypeRegex: + if c.Pattern == "" { + return NewS3InputSourceError(BadRequestErrorCode, "regex pattern cannot be empty") + } + if _, err := regexp.Compile(c.Pattern); err != nil { + return NewS3InputSourceError(BadRequestErrorCode, "invalid regex pattern: %s", err.Error()) + } + + case PartitionKeyTypeSubstring: + if c.StartIndex < 0 { + return NewS3InputSourceError(BadRequestErrorCode, "start index cannot be negative") + } + if c.EndIndex <= c.StartIndex { + return NewS3InputSourceError(BadRequestErrorCode, "end index must be greater than start index") + } + + case PartitionKeyTypeDate: + if c.DateFormat == "" { + return NewS3InputSourceError(BadRequestErrorCode, "date format cannot be empty") + } + + if err := validateDateFormat(c.DateFormat); err != nil { + return err + } + } + return nil +} + +// validateDateFormat validates the date format string +func validateDateFormat(layout string) error { + // Reference time: Jan 2, 2006 at 15:04:05 UTC + ref := time.Date(2006, 1, 2, 15, 4, 5, 0, time.UTC) + + // Format the reference time using the layout + s := ref.Format(layout) + + // Parse the resulting string back into a time + _, err := time.Parse(layout, s) + if err != nil { + // If parsing fails, the layout is invalid + return NewS3InputSourceError(BadRequestErrorCode, "invalid date format: %s", layout) + } + + return nil +} + +// PartitionObjects applies the configured partitioning strategy to a slice of objects +func PartitionObjects( + objects []ObjectSummary, + totalPartitions int, + partitionIndex int, + source SourceSpec, +) ([]ObjectSummary, error) { + if err := source.Partition.Validate(); err != nil { + return nil, err + } + if totalPartitions <= 0 { + return nil, NewS3InputSourceError(BadRequestErrorCode, "job partitions/count must be greater than 0") + } + if partitionIndex < 0 || partitionIndex >= totalPartitions { + return nil, NewS3InputSourceError( + BadRequestErrorCode, "partition index must be between 0 and %d", totalPartitions-1) + } + + // filter out directories + objects = filterDirectories(objects) + + // If there is only 1 partition, just return the entire set + if totalPartitions == 1 { + return objects, nil + } + + // Handle both empty and "none" types the same way + if source.Partition.Type == "" || source.Partition.Type == PartitionKeyTypeNone { + // Return all objects unmodified for both empty and "none" types + return objects, nil + } + + // Sanitize the prefix for pattern matching + prefix := strings.TrimSpace(source.Key) + prefix = strings.TrimSuffix(prefix, "*") + + switch source.Partition.Type { + case PartitionKeyTypeObject: + return partitionByObject(objects, totalPartitions, partitionIndex) + case PartitionKeyTypeRegex: + return partitionByRegex(objects, totalPartitions, partitionIndex, prefix, source.Partition.Pattern) + case PartitionKeyTypeSubstring: + return partitionBySubstring( + objects, totalPartitions, partitionIndex, prefix, source.Partition.StartIndex, source.Partition.EndIndex) + case PartitionKeyTypeDate: + return partitionByDate( + objects, totalPartitions, partitionIndex, prefix, source.Partition.DateFormat) + default: + return nil, NewS3InputSourceError(BadRequestErrorCode, "unsupported partition key type: %s", source.Partition.Type) + } +} + +// partitionByObject partitions objects by hashing their full key +func partitionByObject(objects []ObjectSummary, totalPartitions int, partitionIndex int) ([]ObjectSummary, error) { + var result []ObjectSummary + for _, obj := range objects { + if getPartitionIndex(*obj.Key, totalPartitions) == partitionIndex { + result = append(result, obj) + } + } + return result, nil +} + +// partitionByRegex partitions objects by regex matching. If the pattern contains +// capture groups, partitioning is based on the concatenated capture groups. +// Otherwise, partitioning is based on the full match. Objects that don't match +// the pattern are assigned to partition 0. +func partitionByRegex(objects []ObjectSummary, totalPartitions int, partitionIndex int, prefix, pattern string) ( + []ObjectSummary, error) { + re, err := regexp.Compile(pattern) + if err != nil { + return nil, NewS3InputSourceError(BadRequestErrorCode, "invalid regex pattern: %s", err.Error()) + } + + var result []ObjectSummary + for _, obj := range objects { + key := sanitizeKeyForPatternMatching(*obj.Key, prefix) + matches := re.FindStringSubmatch(key) + + // Collect non-empty capture groups + var matchGroups []string + for i := 1; i < len(matches); i++ { + if matches[i] != "" { + matchGroups = append(matchGroups, matches[i]) + } + } + + var pIndex int + if len(matches) == 0 || matches[0] == "" { + // No match at all or empty match - use fallback + pIndex = fallbackPartitionIndex + } else if len(matchGroups) == 0 { + // No capture groups - use full match + pIndex = getPartitionIndex(matches[0], totalPartitions) + } else { + // Use capture groups + combinedKey := strings.Join(matchGroups, captureGroupDelimiter) + pIndex = getPartitionIndex(combinedKey, totalPartitions) + } + + if pIndex == partitionIndex { + result = append(result, obj) + } + } + return result, nil +} + +// partitionBySubstring partitions objects by taking a substring of their key +func partitionBySubstring( + objects []ObjectSummary, totalPartitions int, partitionIndex int, prefix string, startIndex int, endIndex int) ( + []ObjectSummary, error) { + var result []ObjectSummary + for _, obj := range objects { + key := sanitizeKeyForPatternMatching(*obj.Key, prefix) + + // Convert to rune slice for proper Unicode handling + runes := []rune(key) + + var pIndex int + if len(runes) < endIndex { + pIndex = fallbackPartitionIndex + } else { + substr := string(runes[startIndex:endIndex]) + pIndex = getPartitionIndex(substr, totalPartitions) + } + + if pIndex == partitionIndex { + result = append(result, obj) + } + } + return result, nil +} + +// partitionByDate partitions objects by parsing dates from their keys +func partitionByDate( + objects []ObjectSummary, totalPartitions, partitionIndex int, prefix, dateFormat string) ( + []ObjectSummary, error) { + var result []ObjectSummary + for _, obj := range objects { + key := sanitizeKeyForPatternMatching(*obj.Key, prefix) + + // We'll parse the first len(dateFormat) characters as the date + dateStr := key[:min(len(key), len(dateFormat))] + t, err := time.Parse(dateFormat, dateStr) + + var pIndex int + if err != nil { + // If it fails to parse, fallback to partition 0 + pIndex = fallbackPartitionIndex + } else { + dateKey := t.Format(dateFormat) + pIndex = getPartitionIndex(dateKey, totalPartitions) + } + + if pIndex == partitionIndex { + result = append(result, obj) + } + } + return result, nil +} + +// sanitizeKeyForPatternMatching returns the relative path after the prefix +func sanitizeKeyForPatternMatching(objectKey string, prefix string) string { + key := strings.TrimPrefix(objectKey, prefix) + return strings.TrimPrefix(key, "/") +} + +// getPartitionIndex returns the partition index for a given key using consistent hashing +func getPartitionIndex(key string, totalPartitions int) int { + if totalPartitions <= 0 { + return 0 + } + hash := hashString(key) + return int(hash % uint32(totalPartitions)) //nolint:gosec +} + +// hashString returns a uint32 hash of the input string using FNV-1a +func hashString(s string) uint32 { + h := fnv.New32a() + _, _ = h.Write([]byte(s)) + return h.Sum32() +} + +// filterDirectories filters out directories from the list of objects +func filterDirectories(objects []ObjectSummary) []ObjectSummary { + var result []ObjectSummary + for _, obj := range objects { + if !obj.IsDir { + result = append(result, obj) + } + } + return result +} diff --git a/pkg/s3/partitioning.md b/pkg/s3/partitioning.md new file mode 100644 index 0000000000..32ba31b45d --- /dev/null +++ b/pkg/s3/partitioning.md @@ -0,0 +1,258 @@ +# S3 Object Partitioning + +This documentation describes the partitioning system for S3 objects, which enables efficient distribution of object processing across multiple workers. The system supports multiple partitioning strategies to accommodate different use cases and data organization patterns. + +## Overview + +The partitioning system allows you to split a collection of S3 objects across multiple processors using consistent hashing. This ensures: +- Even distribution of objects +- Deterministic assignment of objects to partitions +- Support for various partitioning strategies +- Graceful handling of edge cases + +## Partition Types + +### 1. None (`PartitionKeyTypeNone`) +- No partitioning is applied +- All objects are processed as a single group +- Useful when partitioning is not needed or when handling small datasets + +### 2. Object (`PartitionKeyTypeObject`) +- Partitions based on the complete object key +- Uses consistent hashing of the entire object path +- Best for random or unpredictable key patterns +- Ensures even distribution across partitions + +### 3. Regex (`PartitionKeyTypeRegex`) +- Partitions using regex pattern matches from object keys +- Configuration requires: + - `Pattern`: A valid regex pattern +- Behavior: + - For patterns with capture groups: + - Combines all non-empty capture groups with a special delimiter (ASCII Unit Separator) + - Hashes the combined string to determine partition + - For patterns without capture groups: + - Uses the full matched string for hashing + - e.g., pattern `00\d` will use "001" for partitioning if it matches "001.txt" + - Falls back to partition 0 if no match is found in the key +- Useful for: + - Complex naming patterns requiring multiple parts (using capture groups) + - Simple pattern matching (without capture groups) + - Extracting specific parts of object keys + +### 4. Substring (`PartitionKeyTypeSubstring`) +- Partitions based on a specific portion of the object key +- Configuration requires: + - `StartIndex`: Beginning of substring (inclusive) + - `EndIndex`: End of substring (exclusive) +- Validates that: + - StartIndex ≥ 0 + - EndIndex > StartIndex +- Falls back to partition 0 if the key is shorter than EndIndex +- Useful for keys with fixed-width segments or known positions + +### 5. Date (`PartitionKeyTypeDate`) +- Partitions based on dates found in object keys +- Configuration requires: + - `DateFormat`: Go time format string (e.g., "2006-01-02") +- Behavior: + - Attempts to parse date from the beginning of the key + - Uses parsed date formatted back to string for hashing + - Falls back to partition 0 if date parsing fails +- Ideal for time-series data or date-based organization + +## Configuration + +### PartitionConfig Structure +```go +type PartitionConfig struct { + Type PartitionKeyType + Pattern string // For regex partitioning + StartIndex int // For substring partitioning + EndIndex int // For substring partitioning + DateFormat string // For date partitioning +} +``` + +### Validation Rules +- Type must be one of the supported PartitionKeyTypes +- Regex partitioning: + - Pattern cannot be empty + - Pattern must be a valid regex +- Substring partitioning: + - StartIndex must be non-negative + - EndIndex must be greater than StartIndex +- Date partitioning: + - DateFormat cannot be empty + - DateFormat must be a valid Go time format + +## Implementation Details + +### Key Processing +1. Directories are filtered out before partitioning +2. Object keys are sanitized by: + - Trimming the common prefix + - Removing leading slashes +3. Special handling for single partition case: + - If total partitions = 1, returns all objects without processing + +### Consistent Hashing +- Uses FNV-1a hash algorithm +- Ensures deterministic distribution +- Formula: `partition_index = hash(key) % total_partitions` + +### Error Handling +- Validates configuration before processing +- Provides fallback behavior for edge cases +- Returns descriptive error messages for invalid configurations + +## Usage Examples + +### Regex Partitioning +#### Regex Partitioning with Capture Groups +```go +config := PartitionConfig{ + Type: PartitionKeyTypeRegex, + Pattern: `data/(\d{4})/(\d{2})/.*\.csv`, +} +``` +This will partition based on year and month capture groups in paths like "data/2024/01/file.csv" + +#### Regex Partitioning without Capture Groups +```go +config := PartitionConfig{ + Type: PartitionKeyTypeRegex, + Pattern: `00\d\.txt`, +} +``` +This will partition based on the full matched string (e.g., "001.txt", "002.txt")` + + +### Date Partitioning +```go +config := PartitionConfig{ + Type: PartitionKeyTypeDate, + DateFormat: "2006-01-02", +} +``` +This will partition objects with keys starting with dates like "2024-01-15-data.csv" + +### Substring Partitioning +```go +config := PartitionConfig{ + Type: PartitionKeyTypeSubstring, + StartIndex: 5, + EndIndex: 13, +} +``` +This will partition based on characters 5-12 of the object key + +## Prefix Trimming Logic + +Before applying any partitioning strategy, the system processes object keys by removing the common prefix. This is handled by the `sanitizeKeyForPatternMatching` function using the following steps: + +1. First, the source prefix is sanitized: + - Whitespace is trimmed + - Trailing wildcards (*) are removed + +2. Then, for each object key: + - The sanitized prefix is removed from the beginning of the key + - Any leading forward slash (/) is removed + +### Prefix Trimming Examples + +Given source prefix: `"data/users/*"` +``` +Original Key | Trimmed Key (used for partitioning) +-------------------------------|-------------------------------- +data/users/2024/file.csv | 2024/file.csv +data/users/archived/2023.csv | archived/2023.csv +data/users//extra/file.csv | extra/file.csv +other/data/users/file.csv | other/data/users/file.csv (no match) +``` + +Given source prefix: `"logs/"` +``` +Original Key | Trimmed Key (used for partitioning) +-------------------------------|-------------------------------- +logs/2024-01-15/server.log | 2024-01-15/server.log +logs//app/debug.log | app/debug.log +logs/error.log | error.log +archive/logs/old.log | archive/logs/old.log (no match) +``` + +Given source prefix: `""` (empty) +``` +Original Key | Trimmed Key (used for partitioning) +-------------------------------|-------------------------------- +data.csv | data.csv (unchanged) +folder/file.txt | folder/file.txt (unchanged) +/root/file.bin | root/file.bin +``` + +### Impact on Partitioning Strategies + +The prefix trimming affects how each partition type processes keys: + +1. **Regex Partitioning** + - Pattern matches against the trimmed key + - Example with prefix `"data/"` and pattern `(\d{4})/(\d{2})`: + ``` + Original: data/2024/01/file.csv + Trimmed: 2024/01/file.csv + Matches: ["2024", "01"] + ``` + +2. **Substring Partitioning** + - Start and end indices apply to the trimmed key + - Example with prefix `"logs/"` and indices (0,10): + ``` + Original: logs/2024-01-15-server.log + Trimmed: 2024-01-15-server.log + Substring: "2024-01-15" + ``` + +3. **Date Partitioning** + - Date parsing starts at the beginning of the trimmed key + - Example with prefix `"archive/"`: + ``` + Original: archive/2024-01-15_backup.tar + Trimmed: 2024-01-15_backup.tar + Date Portion: "2024-01-15" + ``` + +### Best Practices for Prefix Usage + +1. Always consider the full path when designing prefixes: + - Include parent directories if they're part of the pattern + - Account for possible variations in path depth + +2. Be cautious with trailing slashes: + - They affect how the trimming behaves + - Consider standardizing their usage in your application + +3. When using wildcards: + - They're automatically trimmed from the prefix + - Use them to match variable portions of paths + +## Best Practices + +1. Choose the appropriate partition type based on your data organization: + - Use Object for random or unpredictable keys + - Use Regex for complex patterns requiring multiple parts + - Use Substring for fixed-width segments + - Use Date for time-series data + +2. Consider fallback behavior: + - All strategies fall back to partition 0 for unmatched cases + - Design key patterns to minimize fallback scenarios + +3. Performance considerations: + - Regex partitioning has higher computational overhead + - Substring and Date partitioning are more efficient + - Object partitioning provides the most even distribution + +4. Testing recommendations: + - Validate partition distribution with sample data + - Test edge cases and fallback scenarios + - Verify date formats with different timezone scenarios \ No newline at end of file diff --git a/pkg/s3/partitioning_test.go b/pkg/s3/partitioning_test.go new file mode 100644 index 0000000000..ed75ab2468 --- /dev/null +++ b/pkg/s3/partitioning_test.go @@ -0,0 +1,1268 @@ +package s3 + +/* spell-checker: disable */ + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/suite" +) + +type PartitionTestSuite struct { + suite.Suite +} + +func TestPartitionSuite(t *testing.T) { + suite.Run(t, new(PartitionTestSuite)) +} + +func (s *PartitionTestSuite) TestConfigValidation() { + tests := []struct { + name string + config PartitionConfig + expectedErr string + }{ + // Object partitioning + { + name: "valid object config - minimal", + config: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + }, + { + name: "valid object config with unused fields", + config: PartitionConfig{ + Type: PartitionKeyTypeObject, + Pattern: "unused", + StartIndex: 1, + EndIndex: 2, + }, + }, + + // Regex partitioning + { + name: "valid regex - simple pattern", + config: PartitionConfig{ + Type: PartitionKeyTypeRegex, + Pattern: `\d+`, + }, + }, + { + name: "valid regex - complex pattern", + config: PartitionConfig{ + Type: PartitionKeyTypeRegex, + Pattern: `^(?:user|group)-(\d+)-\w+$`, + }, + }, + { + name: "empty regex pattern", + config: PartitionConfig{ + Type: PartitionKeyTypeRegex, + Pattern: "", + }, + expectedErr: "regex pattern cannot be empty", + }, + { + name: "invalid regex pattern - syntax error", + config: PartitionConfig{ + Type: PartitionKeyTypeRegex, + Pattern: "[unclosed", + }, + expectedErr: "invalid regex pattern", + }, + + // Substring partitioning + { + name: "valid substring - zero start", + config: PartitionConfig{ + Type: PartitionKeyTypeSubstring, + StartIndex: 0, + EndIndex: 5, + }, + }, + { + name: "valid substring - non-zero start", + config: PartitionConfig{ + Type: PartitionKeyTypeSubstring, + StartIndex: 3, + EndIndex: 10, + }, + }, + { + name: "negative start index", + config: PartitionConfig{ + Type: PartitionKeyTypeSubstring, + StartIndex: -1, + EndIndex: 5, + }, + expectedErr: "start index cannot be negative", + }, + { + name: "end index equals start index", + config: PartitionConfig{ + Type: PartitionKeyTypeSubstring, + StartIndex: 5, + EndIndex: 5, + }, + expectedErr: "end index must be greater than start index", + }, + { + name: "end index less than start index", + config: PartitionConfig{ + Type: PartitionKeyTypeSubstring, + StartIndex: 10, + EndIndex: 5, + }, + expectedErr: "end index must be greater than start index", + }, + + // Date partitioning + { + name: "valid date ", + config: PartitionConfig{ + Type: PartitionKeyTypeDate, + DateFormat: "2006-01-02", + }, + }, + + { + name: "valid date - with timezone", + config: PartitionConfig{ + Type: PartitionKeyTypeDate, + DateFormat: "2006-01-02T15:04:05Z07:00", + }, + }, + { + name: "empty date format", + config: PartitionConfig{ + Type: PartitionKeyTypeDate, + DateFormat: "", + }, + expectedErr: "date format cannot be empty", + }, + + { + name: "empty partition type", + config: PartitionConfig{ + Type: "", + }, + }, + // Invalid types + { + name: "invalid partition type", + config: PartitionConfig{ + Type: "invalid", + }, + expectedErr: "unsupported partition key type", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + err := tt.config.Validate() + if tt.expectedErr != "" { + s.Require().Error(err) + s.Contains(err.Error(), tt.expectedErr) + } else { + s.Require().NoError(err) + } + }) + } +} + +func (s *PartitionTestSuite) TestPartitionObjects_InvalidInputs() { + objects := []ObjectSummary{createObjectSummary("test.txt", false)} + source := SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + } + + tests := []struct { + name string + objects []ObjectSummary + totalPartitions int + partitionIndex int + expectedErr string + }{ + // Invalid partition counts + { + name: "negative total partitions", + objects: objects, + totalPartitions: -1, + partitionIndex: 0, + expectedErr: "job partitions/count must be greater than 0", + }, + { + name: "zero total partitions", + objects: objects, + totalPartitions: 0, + partitionIndex: 0, + expectedErr: "job partitions/count must be greater than 0", + }, + + // Invalid partition indices + { + name: "negative partition index", + objects: objects, + totalPartitions: 2, + partitionIndex: -1, + expectedErr: "partition index must be between 0 and", + }, + { + name: "partition index equals total partitions", + objects: objects, + totalPartitions: 2, + partitionIndex: 2, + expectedErr: "partition index must be between 0 and", + }, + { + name: "partition index exceeds total partitions", + objects: objects, + totalPartitions: 2, + partitionIndex: 3, + expectedErr: "partition index must be between 0 and", + }, + + // Object list variations + { + name: "nil object list", + objects: nil, + totalPartitions: 2, + partitionIndex: 0, + expectedErr: "", // should handle nil list gracefully + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + _, err := PartitionObjects(tt.objects, tt.totalPartitions, tt.partitionIndex, source) + if tt.expectedErr != "" { + s.Require().Error(err) + s.Contains(err.Error(), tt.expectedErr) + } else { + s.Require().NoError(err) + } + }) + } +} + +func (s *PartitionTestSuite) TestPartitionByObject() { + tests := []struct { + name string + paths []string + prefix string + totalPartitions int + expected [][]string + }{ + { + name: "basic file distribution", + paths: []string{ + "file1.txt", + "file2.txt", + "dir/", // directory + "file3.txt", + "file4.txt", + }, + prefix: "", + totalPartitions: 2, + expected: [][]string{ + {"file1.txt", "file3.txt"}, // partition 0 + {"file2.txt", "file4.txt"}, // partition 1 + }, + }, + { + name: "nested paths", + paths: []string{ + "dir1/file1.txt", + "dir1/dir2/file2.txt", + "dir1/dir2/dir3/", // directory + "dir1/dir2/dir3/file3.txt", + "other/file4.txt", + }, + prefix: "", + totalPartitions: 2, + expected: [][]string{ + {"dir1/dir2/file2.txt", "dir1/dir2/dir3/file3.txt", "other/file4.txt"}, + {"dir1/file1.txt"}, + }, + }, + { + name: "with prefix trimming", + paths: []string{ + "prefix/subdir/file1.txt", + "prefix/subdir/file2.txt", + "prefix/other/file3.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + totalPartitions: 2, + expected: [][]string{ + {"prefix/subdir/file2.txt", "prefix/other/file3.txt"}, + {"prefix/subdir/file1.txt"}, + }, + }, + { + name: "mixed depth paths", + paths: []string{ + "short.txt", + "dir/medium.txt", + "dir/subdir/long.txt", + "very/long/path/file.txt", + "dir/", // directory + }, + prefix: "", + totalPartitions: 2, + expected: [][]string{ + {"dir/medium.txt"}, + {"short.txt", "dir/subdir/long.txt", "very/long/path/file.txt"}, + }, + }, + { + name: "single partition", + paths: []string{ + "file1.txt", + "file2.txt", + "dir/", // directory + }, + prefix: "", + totalPartitions: 1, + expected: [][]string{ + {"file1.txt", "file2.txt"}, // all files in single partition + }, + }, + { + name: "empty list", + paths: []string{}, + prefix: "", + totalPartitions: 2, + expected: [][]string{ + {}, // partition 0 + {}, // partition 1 + }, + }, + { + name: "only directories", + paths: []string{ + "dir1/", + "dir2/", + "dir1/dir3/", + }, + prefix: "", + totalPartitions: 2, + expected: [][]string{ + {}, // partition 0 + {}, // partition 1 + }, + }, + { + name: "with special characters", + paths: []string{ + "prefix/!@#.txt", + "prefix/spaces in name.txt", + "prefix/tabs\there.txt", + "prefix/special/", // directory + }, + prefix: "prefix/", + totalPartitions: 2, + expected: [][]string{ + {"prefix/!@#.txt"}, + {"prefix/spaces in name.txt", "prefix/tabs\there.txt"}, + }, + }, + { + name: "unicode filenames", + paths: []string{ + "prefix/文件1.txt", + "prefix/文件2.txt", + "prefix/文件dir/", // directory + }, + prefix: "prefix/", + totalPartitions: 2, + expected: [][]string{ + {"prefix/文件2.txt"}, + {"prefix/文件1.txt"}, + }, + }, + { + name: "many partitions", + paths: []string{ + "file1.txt", + "file2.txt", + "file3.txt", + }, + prefix: "", + totalPartitions: 5, + expected: [][]string{ + {}, // partition 0 + {"file1.txt"}, // partition 1 + {}, // partition 2 + {"file2.txt"}, // partition 3 + {"file3.txt"}, // partition 4 + }, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + objects := createObjectsFromStrings(tt.paths) + spec := SourceSpec{ + Key: tt.prefix, + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + } + s.verifyPartitioning(spec, objects, tt.totalPartitions, tt.expected) + }) + } +} + +func (s *PartitionTestSuite) TestPartitionByRegex() { + tests := []struct { + name string + paths []string + prefix string + pattern string + totalPartitions int + expected [][]string + expectError bool + errorContains string + }{ + { + name: "basic pattern matching with capture groups", + paths: []string{ + "prefix/user123/file1.txt", + "prefix/user123/file2.txt", + "prefix/user123/another.txt", + "prefix/user456/file3.txt", + "prefix/user789/file4.txt", + "prefix/invalid/file5.txt", // no match + "prefix/userdir/", // directory + }, + prefix: "prefix/", + pattern: `user(\d+)`, + totalPartitions: 4, + expected: [][]string{ + {"prefix/user456/file3.txt", "prefix/invalid/file5.txt"}, // partition 0 + fallback + {"prefix/user789/file4.txt"}, + {}, + {"prefix/user123/file1.txt", "prefix/user123/file2.txt", "prefix/user123/another.txt"}, + }, + }, + { + name: "all non-matching paths", + paths: []string{ + "prefix/abc.txt", + "prefix/def.txt", + "prefix/ghi.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + pattern: `(\d{4}-\d{2})`, // looking for dates + totalPartitions: 2, + expected: [][]string{ + {"prefix/abc.txt", "prefix/def.txt", "prefix/ghi.txt"}, // all in fallback partition + {}, // empty partition + }, + }, + { + name: "unicode in pattern", + paths: []string{ + "prefix/用户123-数据.txt", + "prefix/用户123-目录.txt", + "prefix/用户456-数据.txt", + "prefix/无效.txt", + "prefix/目录/", // directory + }, + prefix: "prefix/", + pattern: `用户(\d+)`, + totalPartitions: 2, + expected: [][]string{ + {"prefix/用户456-数据.txt", "prefix/无效.txt"}, + {"prefix/用户123-数据.txt", "prefix/用户123-目录.txt"}, + }, + }, + { + name: "nested paths", + paths: []string{ + "prefix/a/user123/file.txt", + "prefix/a/user123/another.txt", + "prefix/b/user456/file.txt", + "prefix/c/invalid/file.txt", + "prefix/d/user789/", // directory + }, + prefix: "prefix/", + pattern: `user(\d+)`, + totalPartitions: 2, + expected: [][]string{ + {"prefix/b/user456/file.txt", "prefix/c/invalid/file.txt"}, // partition 0 + {"prefix/a/user123/file.txt", "prefix/a/user123/another.txt"}, // partition 1 + }, + }, + { + name: "multiple capture groups", + paths: []string{ + "prefix/user123-group456.txt", + "prefix/user789-group012.txt", + "prefix/user123-group999.txt", + "prefix/user123-group999.log", + "prefix/invalid.txt", + }, + prefix: "prefix/", + pattern: `user(\d+)-group(\d+)`, + totalPartitions: 3, + expected: [][]string{ + {"prefix/invalid.txt"}, + {"prefix/user123-group999.txt", "prefix/user123-group999.log"}, + {"prefix/user123-group456.txt", "prefix/user789-group012.txt"}, + }, + }, + { + name: "pattern without capture groups", + paths: []string{ + "prefix/001.txt", + "prefix/002.txt", + "prefix/003.txt", + "prefix/abc.txt", // no match + }, + prefix: "prefix/", + pattern: `00\d`, + totalPartitions: 3, + expected: [][]string{ + {"prefix/abc.txt", "prefix/001.txt", "prefix/002.txt"}, // fallback for no match + {"prefix/003.txt"}, + {}, + }, + }, + { + name: "mix of full match and no match", + paths: []string{ + "prefix/log-2024.txt", + "prefix/log-2025.txt", + "prefix/other.txt", + }, + prefix: "prefix/", + pattern: `log-20\d{2}`, + totalPartitions: 2, + expected: [][]string{ + {"prefix/other.txt", "prefix/log-2024.txt"}, // fallback + hash of "log-2024" + {"prefix/log-2025.txt"}, // hash of "log-2025" + }, + }, + { + name: "simple numeric pattern", + paths: []string{ + "prefix/1.txt", + "prefix/2.txt", + "prefix/3.txt", + "prefix/a.txt", + }, + prefix: "prefix/", + pattern: `\d`, + totalPartitions: 2, + expected: [][]string{ + {"prefix/a.txt", "prefix/1.txt", "prefix/3.txt"}, // fallback + hash distribution + {"prefix/2.txt"}, // hash distribution + }, + }, + { + name: "empty capture group", + paths: []string{ + "prefix/data1.txt", + "prefix/data2.txt", + }, + prefix: "prefix/", + pattern: "()", // empty capture group + totalPartitions: 2, + expected: [][]string{ + {"prefix/data1.txt", "prefix/data2.txt"}, + {}, + }, + }, + { + name: "empty input", + paths: []string{}, + prefix: "prefix/", + pattern: `\d+`, + totalPartitions: 2, + expected: [][]string{ + {}, + {}, + }, + }, + { + name: "all directories", + paths: []string{ + "prefix/user123/", + "prefix/user456/", + }, + prefix: "prefix/", + pattern: `user\d+`, + totalPartitions: 2, + expected: [][]string{ + {}, + {}, + }, + }, + { + name: "invalid regex", + paths: []string{ + "test.txt", + }, + prefix: "", + pattern: "[", // invalid regex pattern + totalPartitions: 2, + expectError: true, + errorContains: "invalid regex pattern", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + objects := createObjectsFromStrings(tt.paths) + spec := SourceSpec{ + Key: tt.prefix, + Partition: PartitionConfig{ + Type: PartitionKeyTypeRegex, + Pattern: tt.pattern, + }, + } + if tt.expectError { + _, err := PartitionObjects(objects, tt.totalPartitions, 0, spec) + s.Require().Error(err) + s.Contains(err.Error(), tt.errorContains) + return + } + s.verifyPartitioning(spec, objects, tt.totalPartitions, tt.expected) + }) + } +} + +func (s *PartitionTestSuite) TestPartitionBySubstring() { + tests := []struct { + name string + paths []string + prefix string + startIndex int + endIndex int + totalPartitions int + expected [][]string + }{ + { + name: "basic substring extraction", + paths: []string{ + "prefix/abc123.txt", + "prefix/def456.txt", + "prefix/def999.txt", + "prefix/def999-more-ignored-chars.txt", + "prefix/ghi789.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + startIndex: 0, + endIndex: 3, + totalPartitions: 4, + expected: [][]string{ + {"prefix/def456.txt", "prefix/def999.txt", "prefix/def999-more-ignored-chars.txt"}, + {"prefix/ghi789.txt"}, // no match + {}, // no match + {"prefix/abc123.txt"}, + }, + }, + { + name: "substring with short key fallback", + paths: []string{ + "prefix/short.txt", + "prefix/medium-name.txt", + "prefix/very-long-name.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + startIndex: 0, + endIndex: 10, // longer than some keys + totalPartitions: 3, + expected: [][]string{ + {"prefix/short.txt", "prefix/medium-name.txt"}, // fallback partition + {"prefix/very-long-name.txt"}, + {}, // no match + }, + }, + { + name: "mid-string extraction", + paths: []string{ + "prefix/user-123-abc.txt", + "prefix/user-456-def.txt", + "prefix/user-456-another.txt", + "prefix/user-789-ghi.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/user-", + startIndex: 0, + endIndex: 3, + totalPartitions: 2, + expected: [][]string{ + {"prefix/user-456-def.txt", "prefix/user-456-another.txt"}, + {"prefix/user-123-abc.txt", "prefix/user-789-ghi.txt"}, + }, + }, + { + name: "unicode substring", + paths: []string{ + "prefix/用户123.txt", + "prefix/户用456.txt", + "prefix/用户789.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + startIndex: 0, + endIndex: 1, + totalPartitions: 3, + expected: [][]string{ + {"prefix/户用456.txt"}, + {}, // no match + {"prefix/用户123.txt", "prefix/用户789.txt"}, + }, + }, + { + name: "with special characters", + paths: []string{ + "prefix/abc!@#.txt", + "prefix/def$%^.txt", + "prefix/xyz$%^.txt", + "prefix/ghi&*(.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + startIndex: 3, + endIndex: 6, + totalPartitions: 2, + expected: [][]string{ + {"prefix/def$%^.txt", "prefix/xyz$%^.txt"}, + {"prefix/abc!@#.txt", "prefix/ghi&*(.txt"}, + }, + }, + { + name: "nested paths", + paths: []string{ + "prefix/a/deep/file1.txt", + "prefix/b/deep/file2.txt", + "prefix/b/another/file2.txt", + "prefix/c/deep/file3.txt", + "prefix/d/dir/", // directory + }, + prefix: "prefix/", + startIndex: 0, + endIndex: 1, // first character after prefix + totalPartitions: 2, + expected: [][]string{ + {"prefix/a/deep/file1.txt", "prefix/c/deep/file3.txt"}, + {"prefix/b/deep/file2.txt", "prefix/b/another/file2.txt"}, + }, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + objects := createObjectsFromStrings(tt.paths) + spec := SourceSpec{ + Key: tt.prefix, + Partition: PartitionConfig{ + Type: PartitionKeyTypeSubstring, + StartIndex: tt.startIndex, + EndIndex: tt.endIndex, + }, + } + s.verifyPartitioning(spec, objects, tt.totalPartitions, tt.expected) + }) + } +} + +func (s *PartitionTestSuite) TestPartitionByDate() { + tests := []struct { + name string + paths []string + prefix string + dateFormat string + totalPartitions int + expected [][]string + }{ + { + name: "daily grouping", + paths: []string{ + "prefix/2023-01-01-data.txt", + "prefix/2023-01-02-data.txt", + "prefix/2023-01-02-another.txt", + "prefix/2023-01-02-more.txt", + "prefix/2023-01-03-data.txt", + "prefix/invalid-date.txt", + "prefix/dates/", // directory + }, + prefix: "prefix/", + dateFormat: "2006-01-02", + totalPartitions: 2, + expected: [][]string{ + {"prefix/2023-01-01-data.txt", "prefix/2023-01-03-data.txt", "prefix/invalid-date.txt"}, + {"prefix/2023-01-02-data.txt", "prefix/2023-01-02-another.txt", "prefix/2023-01-02-more.txt"}, + }, + }, + { + name: "monthly grouping", + paths: []string{ + "prefix/2023-01/data1.txt", + "prefix/2023-02/data2.txt", + "prefix/2023-02/another.txt", + "prefix/2023-02/more.txt", + "prefix/2023-03/data3.txt", + "prefix/invalid/data.txt", + "prefix/months/", // directory + }, + prefix: "prefix/", + dateFormat: "2006-01", + totalPartitions: 2, + expected: [][]string{ + {"prefix/2023-01/data1.txt", "prefix/2023-03/data3.txt", "prefix/invalid/data.txt"}, + {"prefix/2023-02/data2.txt", "prefix/2023-02/another.txt", "prefix/2023-02/more.txt"}, + }, + }, + { + name: "yearly grouping", + paths: []string{ + "prefix/2021/data.txt", + "prefix/2022/data.txt", + "prefix/2022/another.txt", + "prefix/2022/more.txt", + "prefix/2023/data.txt", + "prefix/invalid/data.txt", + "prefix/years/", // directory + }, + prefix: "prefix/", + dateFormat: "2006", + totalPartitions: 2, + expected: [][]string{ + {"prefix/2021/data.txt", "prefix/2023/data.txt", "prefix/invalid/data.txt"}, + {"prefix/2022/data.txt", "prefix/2022/another.txt", "prefix/2022/more.txt"}, + }, + }, + { + name: "with timezone", + paths: []string{ + "prefix/2023-01-01T10:00:00Z.txt", + "prefix/2023-01-01T15:30:00-07:00.txt", + "prefix/2023-01-02T01:00:00+09:00.txt", + "prefix/2023-01-02T02:00:00+09:00.txt", + "prefix/2023-01-02T05:00:00+09:00.txt", + "prefix/invalid.txt", + "prefix/tz/", // directory + }, + prefix: "prefix/", + dateFormat: "2006-01-02T1", + totalPartitions: 2, + expected: [][]string{ + {"prefix/2023-01-02T01:00:00+09:00.txt", "prefix/2023-01-02T02:00:00+09:00.txt", "prefix/2023-01-02T05:00:00+09:00.txt", "prefix/invalid.txt"}, + {"prefix/2023-01-01T15:30:00-07:00.txt", "prefix/2023-01-01T10:00:00Z.txt"}, + }, + }, + { + name: "mixed date formats", + paths: []string{ + "prefix/20230101.txt", + "prefix/2023-01-02.txt", + "prefix/20230103.txt", + "prefix/invalid.txt", + "prefix/mixed/", // directory + }, + prefix: "prefix/", + dateFormat: "200601", + totalPartitions: 3, + expected: [][]string{ + {"prefix/2023-01-02.txt", "prefix/invalid.txt"}, + {"prefix/20230103.txt", "prefix/20230101.txt"}, + {}, // no match + }, + }, + { + name: "all invalid dates", + paths: []string{ + "prefix/notadate1.txt", + "prefix/notadate2.txt", + "prefix/notadate3.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + dateFormat: "2006-01-02", + totalPartitions: 2, + expected: [][]string{ + {"prefix/notadate1.txt", "prefix/notadate2.txt", "prefix/notadate3.txt"}, // all in fallback partition + {}, + }, + }, + { + name: "nested date paths", + paths: []string{ + "prefix/region/2023-01-01/data.txt", + "prefix/region/2023-01-02/data.txt", + "prefix/region/invalid/data.txt", + "prefix/backup/", // directory + }, + prefix: "prefix/region/", + dateFormat: "2006-01-02", + totalPartitions: 3, + expected: [][]string{ + {"prefix/region/invalid/data.txt"}, + {"prefix/region/2023-01-02/data.txt"}, + {"prefix/region/2023-01-01/data.txt"}, + }, + }, + { + name: "date ranges across partition boundaries", + paths: []string{ + "prefix/2022-12-31-data.txt", + "prefix/2023-01-01-data.txt", + "prefix/2023-01-02-data.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + dateFormat: "2006-01-02", + totalPartitions: 2, + expected: [][]string{ + {"prefix/2023-01-01-data.txt", "prefix/2022-12-31-data.txt"}, + {"prefix/2023-01-02-data.txt"}, + }, + }, + { + name: "incomplete date formats", + paths: []string{ + "prefix/2023-01.txt", // missing day + "prefix/2023.txt", // missing month and day + "prefix/2023-01-01.txt", // complete date + "prefix/dir/", // directory + }, + prefix: "prefix/", + dateFormat: "2006-01-02", + totalPartitions: 3, + expected: [][]string{ + {"prefix/2023-01.txt", "prefix/2023.txt"}, // fallback partition + {}, // no match + {"prefix/2023-01-01.txt"}, + }, + }, + { + name: "different date separators", + paths: []string{ + "prefix/2023.01.01.txt", + "prefix/2023.01.02.txt", + "prefix/2023.01.02.log", + "prefix/2023.01.03.txt", + "prefix/dir/", // directory + }, + prefix: "prefix/", + dateFormat: "2006.01.02", + totalPartitions: 2, + expected: [][]string{ + {"prefix/2023.01.01.txt", "prefix/2023.01.03.txt"}, + {"prefix/2023.01.02.txt", "prefix/2023.01.02.log"}, + }, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + objects := createObjectsFromStrings(tt.paths) + spec := SourceSpec{ + Key: tt.prefix, + Partition: PartitionConfig{ + Type: PartitionKeyTypeDate, + DateFormat: tt.dateFormat, + }, + } + s.verifyPartitioning(spec, objects, tt.totalPartitions, tt.expected) + }) + } +} + +func (s *PartitionTestSuite) TestEdgeCases() { + tests := []struct { + name string + paths []string + spec SourceSpec + totalPartitions int + expected [][]string + expectError bool + errorContains string + }{ + { + name: "empty object list", + paths: []string{}, + spec: SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + }, + totalPartitions: 2, + expected: [][]string{ + {}, // partition 0 + {}, // partition 1 + }, + }, + { + name: "all directories", + paths: []string{ + "prefix/dir1/", + "prefix/dir2/", + "prefix/dir3/", + }, + spec: SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + }, + totalPartitions: 2, + expected: [][]string{ + {}, // partition 0 + {}, // partition 1 + }, + }, + { + name: "paths with special characters", + paths: []string{ + "prefix/!@#$%^&*.txt", + "prefix/spaces in name.txt", + "prefix/tab\tin name.txt", + "prefix/newline\nin name.txt", + "prefix/escaped\\slash.txt", + }, + spec: SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + }, + totalPartitions: 2, + expected: [][]string{ + {"prefix/escaped\\slash.txt", "prefix/newline\nin name.txt", "prefix/tab\tin name.txt"}, + {"prefix/spaces in name.txt", "prefix/!@#$%^&*.txt"}, + }, + }, + { + name: "very long keys", + paths: []string{ + "prefix/" + strings.Repeat("a", 1000) + ".txt", + "prefix/" + strings.Repeat("b", 2000) + ".txt", + "prefix/" + strings.Repeat("c", 3000) + ".txt", + }, + spec: SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + }, + totalPartitions: 3, + expected: [][]string{ + {"prefix/" + strings.Repeat("c", 3000) + ".txt"}, + {"prefix/" + strings.Repeat("b", 2000) + ".txt"}, + {"prefix/" + strings.Repeat("a", 1000) + ".txt"}, + }, + }, + { + name: "mixed unicode scripts", + paths: []string{ + "prefix/文件1.txt", // Chinese + "prefix/файл2.txt", // Russian + "prefix/파일3.txt", // Korean + "prefix/ファイル4.txt", // Japanese + "prefix/ملف5.txt", // Arabic + }, + spec: SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + }, + totalPartitions: 3, + expected: [][]string{ + {"prefix/файл2.txt"}, + {"prefix/ファイル4.txt"}, + {"prefix/文件1.txt", "prefix/파일3.txt", "prefix/ملف5.txt"}, + }, + }, + { + name: "paths with empty segments", + paths: []string{ + "prefix//file1.txt", // double slash + "prefix/./file2.txt", // current dir + "prefix/../file3.txt", // parent dir + "prefix/.file4.txt", // hidden file + "prefix/ file5.txt", // leading space + "prefix/file6.txt ", // trailing space + }, + spec: SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + }, + totalPartitions: 2, + expected: [][]string{ + {"prefix//file1.txt", "prefix/../file3.txt", "prefix/file6.txt ", "prefix/.file4.txt"}, + {"prefix/./file2.txt", "prefix/ file5.txt"}, + }, + }, + { + name: "mixed path separators", + paths: []string{ + "prefix\\file1.txt", + "prefix/file2.txt", + "prefix\\sub\\file3.txt", + "prefix/sub/file4.txt", + }, + spec: SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeObject, + }, + }, + totalPartitions: 2, + expected: [][]string{ + {"prefix\\file1.txt", "prefix/file2.txt", "prefix\\sub\\file3.txt"}, + {"prefix/sub/file4.txt"}, + }, + }, + { + name: "substring with zero-width characters", + paths: []string{ + "prefix/\u200Bfile1.txt", // zero-width space + "prefix/\uFEFFfile2.txt", // byte order mark + "prefix/\u200Efile3.txt", // zero-width non-joiner + }, + spec: SourceSpec{ + Key: "prefix/", + Partition: PartitionConfig{ + Type: PartitionKeyTypeSubstring, + StartIndex: 0, + EndIndex: 4, + }, + }, + totalPartitions: 2, + expected: [][]string{ + {"prefix/\u200Efile3.txt"}, + {"prefix/\u200Bfile1.txt", "prefix/\uFEFFfile2.txt"}, + }, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + objects := createObjectsFromStrings(tt.paths) + if tt.expectError { + _, err := PartitionObjects(objects, tt.totalPartitions, 0, tt.spec) + s.Require().Error(err) + if tt.errorContains != "" { + s.Contains(err.Error(), tt.errorContains) + } + return + } + s.verifyPartitioning(tt.spec, objects, tt.totalPartitions, tt.expected) + }) + } +} + +func createObjectSummary(key string, isDir bool) ObjectSummary { + return ObjectSummary{ + Key: &key, + IsDir: isDir, + } +} + +// createObjectsFromStrings creates ObjectSummary slices from strings, treating paths with / suffix as directories +func createObjectsFromStrings(paths []string) []ObjectSummary { + objects := make([]ObjectSummary, len(paths)) + for i, path := range paths { + isDir := strings.HasSuffix(path, "/") + objects[i] = createObjectSummary(path, isDir) + } + return objects +} + +// verifyPartitionContents checks if actual partitions match expected content +func (s *PartitionTestSuite) verifyPartitionContents(actualPartitions [][]ObjectSummary, expectedPartitions [][]string) { + s.Require().Equal(len(expectedPartitions), len(actualPartitions), "partition count mismatch") + + for i := range expectedPartitions { + actualPaths := make([]string, len(actualPartitions[i])) + for j, obj := range actualPartitions[i] { + actualPaths[j] = *obj.Key + } + + s.ElementsMatch(expectedPartitions[i], actualPaths, + "partition %d content mismatch. Expected %s but found %s", i, expectedPartitions[i], actualPaths) + } +} + +// Helper methods for verification +func (s *PartitionTestSuite) verifyNoDirectories(partitions [][]ObjectSummary) { + for _, partition := range partitions { + for _, obj := range partition { + s.False(obj.IsDir, "Directory found in partition: %s", *obj.Key) + } + } +} + +func (s *PartitionTestSuite) verifyComplete(objects []ObjectSummary, partitions [][]ObjectSummary) { + expectedCount := 0 + for _, obj := range objects { + if !obj.IsDir { + expectedCount++ + } + } + + actualCount := 0 + for _, partition := range partitions { + actualCount += len(partition) + } + + s.Equal(expectedCount, actualCount, "Not all objects were distributed to partitions") +} + +func (s *PartitionTestSuite) verifyConsistency(spec SourceSpec, objects []ObjectSummary, totalPartitions int) { + initialPartitions := make([][]ObjectSummary, totalPartitions) + for i := 0; i < totalPartitions; i++ { + partition, err := PartitionObjects(objects, totalPartitions, i, spec) + s.Require().NoError(err) + initialPartitions[i] = partition + } + + // Run multiple times to verify consistent distribution + for run := 0; run < 3; run++ { + for i := 0; i < totalPartitions; i++ { + partition, err := PartitionObjects(objects, totalPartitions, i, spec) + s.Require().NoError(err) + s.Equal(len(initialPartitions[i]), len(partition), "Partition size changed on subsequent run") + for j := range initialPartitions[i] { + s.Equal(*initialPartitions[i][j].Key, *partition[j].Key, "Object distribution changed on subsequent run") + } + } + } +} + +func (s *PartitionTestSuite) verifyPartitioning(spec SourceSpec, objects []ObjectSummary, totalPartitions int, expected [][]string) { + s.Require().NotNil(expected, "expected partition contents must not be nil") + s.Require().Equal(totalPartitions, len(expected), "expected partition count must match totalPartitions") + + partitions := make([][]ObjectSummary, totalPartitions) + for i := 0; i < totalPartitions; i++ { + partition, err := PartitionObjects(objects, totalPartitions, i, spec) + s.Require().NoError(err) + partitions[i] = partition + } + + s.verifyNoDirectories(partitions) + s.verifyComplete(objects, partitions) + s.verifyConsistency(spec, objects, totalPartitions) + if expected != nil { + s.verifyPartitionContents(partitions, expected) + } +} diff --git a/pkg/s3/types.go b/pkg/s3/types.go index 26b49e9d67..544d5d38d8 100644 --- a/pkg/s3/types.go +++ b/pkg/s3/types.go @@ -9,6 +9,14 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/models" ) +type ObjectSummary struct { + Key *string + ETag *string + VersionID *string + Size int64 + IsDir bool +} + type SourceSpec struct { Bucket string Key string @@ -17,13 +25,14 @@ type SourceSpec struct { Endpoint string VersionID string ChecksumSHA256 string + Partition PartitionConfig } func (c SourceSpec) Validate() error { if c.Bucket == "" { return NewS3InputSourceError(BadRequestErrorCode, "invalid s3 storage params: bucket cannot be empty") } - return nil + return c.Partition.Validate() } func (c SourceSpec) ToMap() map[string]interface{} { @@ -50,8 +59,7 @@ func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { if !spec.IsType(models.StorageSourceS3) { return SourceSpec{}, NewS3InputSourceError( BadRequestErrorCode, - fmt.Sprintf("invalid storage source type. expected %s but received: %s", models.StorageSourceS3, spec.Type), - ) + "invalid storage source type. expected %s but received: %s", models.StorageSourceS3, spec.Type) } inputParams := spec.Params if inputParams == nil { @@ -69,7 +77,8 @@ func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { func DecodePreSignedResultSpec(spec *models.SpecConfig) (PreSignedResultSpec, error) { if !spec.IsType(models.StorageSourceS3PreSigned) { return PreSignedResultSpec{}, NewS3InputSourceError(BadRequestErrorCode, - "invalid storage source type. expected "+models.StorageSourceS3PreSigned+" but received: "+spec.Type) + "invalid storage source type. expected %s but received: %s", + models.StorageSourceS3PreSigned, spec.Type) } inputParams := spec.Params diff --git a/pkg/storage/inline/storage.go b/pkg/storage/inline/storage.go index 130e7f1c8f..d5b9ac3678 100644 --- a/pkg/storage/inline/storage.go +++ b/pkg/storage/inline/storage.go @@ -64,7 +64,7 @@ func (i *InlineStorage) CleanupStorage(_ context.Context, _ models.InputSource, // For an inline storage, we define the volume size as uncompressed data size, // as this is how much resource using the storage will take up. -func (i *InlineStorage) GetVolumeSize(_ context.Context, spec models.InputSource) (uint64, error) { +func (i *InlineStorage) GetVolumeSize(_ context.Context, _ *models.Execution, spec models.InputSource) (uint64, error) { source, err := DecodeSpec(spec.Source) if err != nil { return 0, err diff --git a/pkg/storage/inline/storage_test.go b/pkg/storage/inline/storage_test.go index 924c405816..cd68070549 100644 --- a/pkg/storage/inline/storage_test.go +++ b/pkg/storage/inline/storage_test.go @@ -28,7 +28,7 @@ func TestPlaintextInlineStorage(t *testing.T) { inputSource := models.InputSource{Source: &spec, Target: "target"} - size, err := storage.GetVolumeSize(context.Background(), inputSource) + size, err := storage.GetVolumeSize(context.Background(), mock.Execution(), inputSource) require.NoError(t, err) require.Equal(t, uint64(len("test")), size) @@ -55,7 +55,7 @@ func TestDirectoryInlineStorage(t *testing.T) { inputSource := models.InputSource{Source: &spec, Target: "target"} - size, err := storage.GetVolumeSize(context.Background(), inputSource) + size, err := storage.GetVolumeSize(context.Background(), mock.Execution(), inputSource) require.NoError(t, err) require.Equal(t, uint64(len("test")+len("more")), size) diff --git a/pkg/storage/ipfs/storage.go b/pkg/storage/ipfs/storage.go index f91a5476f1..0f5c65be3a 100644 --- a/pkg/storage/ipfs/storage.go +++ b/pkg/storage/ipfs/storage.go @@ -49,7 +49,7 @@ func (s *StorageProvider) HasStorageLocally(ctx context.Context, volume models.I return s.ipfsClient.HasCID(ctx, source.CID) } -func (s *StorageProvider) GetVolumeSize(ctx context.Context, volume models.InputSource) (uint64, error) { +func (s *StorageProvider) GetVolumeSize(ctx context.Context, _ *models.Execution, volume models.InputSource) (uint64, error) { // we wrap this in a timeout because if the CID is not present on the network this seems to hang // TODO(forrest) [correctness] this timeout should be passed in as a param or set on the context by the method caller. diff --git a/pkg/storage/ipfs/storage_test.go b/pkg/storage/ipfs/storage_test.go index 471feaea1b..f1884974b5 100644 --- a/pkg/storage/ipfs/storage_test.go +++ b/pkg/storage/ipfs/storage_test.go @@ -52,7 +52,7 @@ func (s *StorageSuite) TestGetVolumeSize() { cid, err := ipfs.AddTextToNodes(ctx, []byte(testString), *s.ipfsClient) s.Require().NoError(err) - result, err := s.storage.GetVolumeSize(ctx, models.InputSource{ + result, err := s.storage.GetVolumeSize(ctx, mock.Execution(), models.InputSource{ Source: &models.SpecConfig{ Type: models.StorageSourceIPFS, Params: Source{ @@ -105,7 +105,7 @@ func (s *StorageSuite) TestGetVolumeSizeRespectsTimeout() { cid, err := ipfs.AddTextToNodes(ctx, []byte("testString"), *s.ipfsClient) s.Require().NoError(err) - _, err = s.storage.GetVolumeSize(ctx, models.InputSource{ + _, err = s.storage.GetVolumeSize(ctx, mock.Execution(), models.InputSource{ Source: &models.SpecConfig{ Type: models.StorageSourceIPFS, Params: Source{ diff --git a/pkg/storage/local_directory/storage.go b/pkg/storage/local_directory/storage.go index a4803e923b..824ec2a3ac 100644 --- a/pkg/storage/local_directory/storage.go +++ b/pkg/storage/local_directory/storage.go @@ -48,7 +48,7 @@ func (driver *StorageProvider) HasStorageLocally(_ context.Context, volume model return true, nil } -func (driver *StorageProvider) GetVolumeSize(_ context.Context, volume models.InputSource) (uint64, error) { +func (driver *StorageProvider) GetVolumeSize(_ context.Context, _ *models.Execution, volume models.InputSource) (uint64, error) { source, err := DecodeSpec(volume.Source) if err != nil { return 0, err diff --git a/pkg/storage/local_directory/storage_test.go b/pkg/storage/local_directory/storage_test.go index 64b7e63d3d..c5db87d7e2 100644 --- a/pkg/storage/local_directory/storage_test.go +++ b/pkg/storage/local_directory/storage_test.go @@ -206,7 +206,7 @@ func (s *LocalDirectorySuite) TestGetVolumeSize() { storageProvider, err := NewStorageProvider(StorageProviderParams{AllowedPaths: ParseAllowPaths(tc.allowedPaths)}) require.NoError(s.T(), err) - volumeSize, err := storageProvider.GetVolumeSize(context.Background(), s.prepareStorageSpec(tc.sourcePath)) + volumeSize, err := storageProvider.GetVolumeSize(context.Background(), mock.Execution(), s.prepareStorageSpec(tc.sourcePath)) if tc.shouldFail { require.Error(s.T(), err) return diff --git a/pkg/storage/noop/noop.go b/pkg/storage/noop/noop.go index 4ed9e54e54..f4136c7544 100644 --- a/pkg/storage/noop/noop.go +++ b/pkg/storage/noop/noop.go @@ -69,7 +69,7 @@ func (s *NoopStorage) HasStorageLocally(ctx context.Context, volume models.Input } // we wrap this in a timeout because if the CID is not present on the network this seems to hang -func (s *NoopStorage) GetVolumeSize(ctx context.Context, volume models.InputSource) (uint64, error) { +func (s *NoopStorage) GetVolumeSize(ctx context.Context, _ *models.Execution, volume models.InputSource) (uint64, error) { if s.Config.ExternalHooks.GetVolumeSize != nil { handler := s.Config.ExternalHooks.GetVolumeSize return handler(ctx, volume) diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index 6286318e29..a7c953d45a 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -31,14 +31,6 @@ The storage provider supports downloading: - a prefix and all objects matching the prefix: s3://myBucket/dir/file-* */ -type s3ObjectSummary struct { - key *string - eTag *string - versionID *string - size int64 - isDir bool -} - type StorageProviderParams struct { ClientProvider *s3helper.ClientProvider } @@ -70,7 +62,7 @@ func (s *StorageProvider) HasStorageLocally(_ context.Context, _ models.InputSou return false, nil } -func (s *StorageProvider) GetVolumeSize(ctx context.Context, volume models.InputSource) (uint64, error) { +func (s *StorageProvider) GetVolumeSize(ctx context.Context, execution *models.Execution, volume models.InputSource) (uint64, error) { ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() @@ -84,22 +76,28 @@ func (s *StorageProvider) GetVolumeSize(ctx context.Context, volume models.Input if err != nil { return 0, err } + + objects, err = s3helper.PartitionObjects(objects, execution.Job.Count, execution.PartitionIndex, source) + if err != nil { + return 0, err + } + var size uint64 for _, object := range objects { // Check for negative size - if object.size < 0 { - return 0, fmt.Errorf("invalid negative size for object: %d", object.size) + if object.Size < 0 { + return 0, fmt.Errorf("invalid negative size for object: %d", object.Size) } // Check for overflow // MaxUint64 - size = remaining space before overflow //nolint:gosec // G115: negative values already checked - if object.size > 0 && uint64(object.size) > math.MaxUint64-size { + if object.Size > 0 && uint64(object.Size) > math.MaxUint64-size { return 0, fmt.Errorf("total size exceeds uint64 maximum") } //nolint:gosec // G115: Already checked above - size += uint64(object.size) + size += uint64(object.Size) } return size, nil } @@ -130,6 +128,11 @@ func (s *StorageProvider) PrepareStorage( return storage.StorageVolume{}, err } + objects, err = s3helper.PartitionObjects(objects, execution.Job.Count, execution.PartitionIndex, source) + if err != nil { + return storage.StorageVolume{}, err + } + prefixTokens := strings.Split(s.sanitizeKey(source.Key), "/") for _, object := range objects { @@ -152,11 +155,11 @@ func (s *StorageProvider) PrepareStorage( func (s *StorageProvider) downloadObject(ctx context.Context, client *s3helper.ClientWrapper, source s3helper.SourceSpec, - object s3ObjectSummary, + object s3helper.ObjectSummary, parentDir string, prefixTokens []string) error { // trim the user supplied prefix from the object local path - objectTokens := strings.Split(*object.key, "/") + objectTokens := strings.Split(*object.Key, "/") startingIndex := 0 for i := 0; i < len(prefixTokens)-1; i++ { if prefixTokens[i] == objectTokens[i] { @@ -169,10 +172,6 @@ func (s *StorageProvider) downloadObject(ctx context.Context, // relative output path to the supplied prefix outputPath := filepath.Join(parentDir, filepath.Join(objectTokens[startingIndex:]...)) - if object.isDir { - return os.MkdirAll(outputPath, models.DownloadFolderPerm) - } - // create all parent directories if needed err := os.MkdirAll(filepath.Dir(outputPath), models.DownloadFolderPerm) if err != nil { @@ -187,12 +186,12 @@ func (s *StorageProvider) downloadObject(ctx context.Context, defer outputFile.Close() //nolint:errcheck log.Debug().Msgf("Downloading s3://%s/%s versionID:%s, eTag:%s to %s.", - source.Bucket, aws.ToString(object.key), aws.ToString(object.versionID), aws.ToString(object.eTag), outputFile.Name()) + source.Bucket, aws.ToString(object.Key), aws.ToString(object.VersionID), aws.ToString(object.ETag), outputFile.Name()) _, err = client.Downloader.Download(ctx, outputFile, &s3.GetObjectInput{ Bucket: aws.String(source.Bucket), - Key: object.key, - VersionId: object.versionID, - IfMatch: object.eTag, + Key: object.Key, + VersionId: object.VersionID, + IfMatch: object.ETag, }) if err != nil { return s3helper.NewS3InputSourceServiceError(err) @@ -219,7 +218,8 @@ func (s *StorageProvider) Upload(_ context.Context, _ string) (models.SpecConfig //nolint:gocyclo func (s *StorageProvider) explodeKey( - ctx context.Context, client *s3helper.ClientWrapper, storageSpec s3helper.SourceSpec) ([]s3ObjectSummary, error) { + ctx context.Context, client *s3helper.ClientWrapper, storageSpec s3helper.SourceSpec) ( + []s3helper.ObjectSummary, error) { if storageSpec.Key != "" && !strings.HasSuffix(storageSpec.Key, "*") && !strings.HasSuffix(storageSpec.Key, "/") { request := &s3.HeadObjectInput{ Bucket: aws.String(storageSpec.Bucket), @@ -242,15 +242,15 @@ func (s *StorageProvider) explodeKey( storageSpec.Bucket, storageSpec.Key, storageSpec.ChecksumSHA256, aws.ToString(headResp.ChecksumSHA256)) } if headResp.ContentType != nil && !strings.HasPrefix(*headResp.ContentType, "application/x-directory") { - objectSummary := s3ObjectSummary{ - key: aws.String(storageSpec.Key), - size: *headResp.ContentLength, - eTag: headResp.ETag, + objectSummary := s3helper.ObjectSummary{ + Key: aws.String(storageSpec.Key), + Size: *headResp.ContentLength, + ETag: headResp.ETag, } if storageSpec.VersionID != "" { - objectSummary.versionID = aws.String(storageSpec.VersionID) + objectSummary.VersionID = aws.String(storageSpec.VersionID) } - return []s3ObjectSummary{objectSummary}, nil + return []s3helper.ObjectSummary{objectSummary}, nil } } @@ -262,7 +262,7 @@ func (s *StorageProvider) explodeKey( // if the key is a directory, or ends with a wildcard, we need to list the objects starting with the key sanitizedKey := s.sanitizeKey(storageSpec.Key) - res := make([]s3ObjectSummary, 0) + res := make([]s3helper.ObjectSummary, 0) var continuationToken *string for { resp, err := client.S3.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ @@ -280,10 +280,10 @@ func (s *StorageProvider) explodeKey( continue } } - res = append(res, s3ObjectSummary{ - key: object.Key, - size: *object.Size, - isDir: strings.HasSuffix(*object.Key, "/"), + res = append(res, s3helper.ObjectSummary{ + Key: object.Key, + Size: *object.Size, + IsDir: strings.HasSuffix(*object.Key, "/"), }) } if !*resp.IsTruncated { diff --git a/pkg/storage/s3/storage_test.go b/pkg/storage/s3/storage_test.go index 15a510756a..90ce4093e9 100644 --- a/pkg/storage/s3/storage_test.go +++ b/pkg/storage/s3/storage_test.go @@ -60,6 +60,9 @@ func (s *StorageTestSuite) TestStorage() { checksum string versionID string shouldFail bool + partitionConfig s3helper.PartitionConfig + jobCount int + partitionIndex int }{ { name: "single object", @@ -68,6 +71,28 @@ func (s *StorageTestSuite) TestStorage() { {"001", "001.txt"}, }, }, + { + name: "single object - part 0", + key: prefix1 + "001.txt", + expectedOutputs: []expectedOutput{}, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeObject, + }, + jobCount: 2, + partitionIndex: 0, + }, + { + name: "single object - part 1", + key: prefix1 + "001.txt", + expectedOutputs: []expectedOutput{ + {"001", "001.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeObject, + }, + jobCount: 2, + partitionIndex: 1, + }, { name: "single directory", key: prefix1, @@ -78,6 +103,90 @@ func (s *StorageTestSuite) TestStorage() { {"102", "102.txt"}, }, }, + { + name: "single directory with object partitioning - part 0", + key: prefix1, + expectedOutputs: []expectedOutput{ + {"002", "002.txt"}, + {"101", "101.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeObject, + }, + jobCount: 2, + partitionIndex: 0, + }, + { + name: "single directory with object partitioning - part 1", + key: prefix1, + expectedOutputs: []expectedOutput{ + {"001", "001.txt"}, + {"102", "102.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeObject, + }, + jobCount: 2, + partitionIndex: 1, + }, + { + name: "single directory with regex partitioning - part 0", + key: prefix1, + expectedOutputs: []expectedOutput{ + {"101", "101.txt"}, + {"102", "102.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeRegex, + Pattern: `\d0`, + }, + jobCount: 2, + partitionIndex: 0, + }, + { + name: "single directory with regex partitioning - part 1", + key: prefix1, + expectedOutputs: []expectedOutput{ + {"001", "001.txt"}, + {"002", "002.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeRegex, + Pattern: `\d0`, + }, + jobCount: 2, + partitionIndex: 1, + }, + { + name: "single directory with substring partitioning - part 0", + key: prefix1, + expectedOutputs: []expectedOutput{ + {"001", "001.txt"}, + {"101", "101.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeSubstring, + StartIndex: 2, + EndIndex: 3, + }, + jobCount: 2, + partitionIndex: 0, + }, + { + name: "single directory with substring partitioning - part 1", + key: prefix1, + expectedOutputs: []expectedOutput{ + {"002", "002.txt"}, + {"102", "102.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeSubstring, + StartIndex: 2, + EndIndex: 3, + }, + jobCount: 2, + partitionIndex: 1, + }, { name: "single directory trailing asterisk", key: prefix1 + "*", @@ -88,6 +197,19 @@ func (s *StorageTestSuite) TestStorage() { {"102", "102.txt"}, }, }, + { + name: "single directory trailing asterisk partitioning", + key: prefix1 + "*", + expectedOutputs: []expectedOutput{ + {"002", "002.txt"}, + {"101", "101.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeObject, + }, + jobCount: 2, + partitionIndex: 0, + }, { name: "nested directory", key: prefix2, @@ -98,6 +220,20 @@ func (s *StorageTestSuite) TestStorage() { {"302", "nested/302.txt"}, }, }, + { + name: "nested directory with regex partitioning", + key: prefix2, + expectedOutputs: []expectedOutput{ + {"301", "nested/301.txt"}, + {"302", "nested/302.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeRegex, + Pattern: `nested/\d0`, + }, + jobCount: 3, + partitionIndex: 2, + }, { name: "file pattern", key: prefix1 + "00*", @@ -106,6 +242,34 @@ func (s *StorageTestSuite) TestStorage() { {"002", "002.txt"}, }, }, + { + name: "file pattern - part 0", + key: prefix1 + "00*", + expectedOutputs: []expectedOutput{ + {"001", "001.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeSubstring, + StartIndex: 0, + EndIndex: 1, + }, + jobCount: 2, + partitionIndex: 0, + }, + { + name: "file pattern - part 1", + key: prefix1 + "00*", + expectedOutputs: []expectedOutput{ + {"002", "002.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeSubstring, + StartIndex: 0, + EndIndex: 1, + }, + jobCount: 2, + partitionIndex: 1, + }, { name: "directory pattern", key: root + "set*", @@ -120,6 +284,38 @@ func (s *StorageTestSuite) TestStorage() { {"302", "set2/nested/302.txt"}, }, }, + { + name: "directory pattern - part 0", + key: root + "set*", + expectedOutputs: []expectedOutput{ + {"001", "set1/001.txt"}, + {"101", "set1/101.txt"}, + {"201", "set2/201.txt"}, + {"301", "set2/nested/301.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeRegex, + Pattern: `\d\.txt`, + }, + jobCount: 2, + partitionIndex: 0, + }, + { + name: "directory pattern - part 1", + key: root + "set*", + expectedOutputs: []expectedOutput{ + {"002", "set1/002.txt"}, + {"102", "set1/102.txt"}, + {"202", "set2/202.txt"}, + {"302", "set2/nested/302.txt"}, + }, + partitionConfig: s3helper.PartitionConfig{ + Type: s3helper.PartitionKeyTypeRegex, + Pattern: `\d\.txt`, + }, + jobCount: 2, + partitionIndex: 1, + }, { name: "single directory filter", key: prefix1 + "*", @@ -215,6 +411,12 @@ func (s *StorageTestSuite) TestStorage() { } { s.Run(tc.name, func() { ctx := context.Background() + + // Set default job count and partition index if not specified + if tc.jobCount == 0 { + tc.jobCount = 1 + } + storageSpec := models.InputSource{ Source: &models.SpecConfig{ Type: models.StorageSourceS3, @@ -225,10 +427,16 @@ func (s *StorageTestSuite) TestStorage() { Region: s.Region, ChecksumSHA256: tc.checksum, VersionID: tc.versionID, + Partition: tc.partitionConfig, }.ToMap(), }, } - size, err := s.Storage.GetVolumeSize(ctx, storageSpec) + + execution := mock.Execution() + execution.Job.Count = tc.jobCount + execution.PartitionIndex = tc.partitionIndex + + size, err := s.Storage.GetVolumeSize(ctx, execution, storageSpec) if tc.shouldFail { s.Error(err) return @@ -236,7 +444,7 @@ func (s *StorageTestSuite) TestStorage() { s.Require().NoError(err) s.Equal(uint64(len(tc.expectedOutputs)*4), size) // each file is 4 bytes long - volume, err := s.Storage.PrepareStorage(ctx, s.T().TempDir(), mock.Execution(), storageSpec) + volume, err := s.Storage.PrepareStorage(ctx, s.T().TempDir(), execution, storageSpec) s.Require().NoError(err) // check that the files are there @@ -271,7 +479,7 @@ func (s *StorageTestSuite) TestNotFound() { }, } - _, err := s.Storage.GetVolumeSize(ctx, storageSpec) + _, err := s.Storage.GetVolumeSize(ctx, mock.Execution(), storageSpec) s.Require().Error(err) _, err = s.Storage.PrepareStorage(ctx, s.T().TempDir(), mock.Execution(), storageSpec) diff --git a/pkg/storage/s3/types.go b/pkg/storage/s3/types.go index 5c6b52b4c7..03d83c2bd7 100644 --- a/pkg/storage/s3/types.go +++ b/pkg/storage/s3/types.go @@ -2,7 +2,6 @@ package s3 import ( "errors" - "fmt" "github.com/fatih/structs" "github.com/mitchellh/mapstructure" @@ -36,10 +35,8 @@ func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { if !spec.IsType(models.StorageSourceS3) { return SourceSpec{}, s3.NewS3InputSourceError( - s3.BadRequestErrorCode, - fmt.Sprintf("invalid storage source type. expected %s, but received: %s", - models.StorageSourceS3, spec.Type, - ), + s3.BadRequestErrorCode, "invalid storage source type. expected %s, but received: %s", + models.StorageSourceS3, spec.Type, ) } inputParams := spec.Params @@ -75,10 +72,8 @@ func DecodePreSignedResultSpec(spec *models.SpecConfig) (PreSignedResultSpec, er if !spec.IsType(models.StorageSourceS3PreSigned) { return PreSignedResultSpec{}, s3.NewS3InputSourceError( - s3.BadRequestErrorCode, - fmt.Sprintf("invalid storage source type. expected %s, but received: %s", - models.StorageSourceS3PreSigned, spec.Type, - ), + s3.BadRequestErrorCode, "invalid storage source type. expected %s, but received: %s", + models.StorageSourceS3PreSigned, spec.Type, ) } diff --git a/pkg/storage/tracing/tracing.go b/pkg/storage/tracing/tracing.go index 2e7036b3c4..67f3fff19d 100644 --- a/pkg/storage/tracing/tracing.go +++ b/pkg/storage/tracing/tracing.go @@ -38,11 +38,11 @@ func (t *tracingStorage) HasStorageLocally(ctx context.Context, spec models.Inpu return t.delegate.HasStorageLocally(ctx, spec) } -func (t *tracingStorage) GetVolumeSize(ctx context.Context, spec models.InputSource) (uint64, error) { +func (t *tracingStorage) GetVolumeSize(ctx context.Context, execution *models.Execution, spec models.InputSource) (uint64, error) { ctx, span := telemetry.NewSpan(ctx, telemetry.GetTracer(), fmt.Sprintf("%s.GetVolumeSize", t.name)) defer span.End() - return t.delegate.GetVolumeSize(ctx, spec) + return t.delegate.GetVolumeSize(ctx, execution, spec) } func (t *tracingStorage) PrepareStorage( diff --git a/pkg/storage/types.go b/pkg/storage/types.go index 8c56ae550b..92c0ceb14b 100644 --- a/pkg/storage/types.go +++ b/pkg/storage/types.go @@ -16,7 +16,7 @@ type Storage interface { HasStorageLocally(context.Context, models.InputSource) (bool, error) // how big is the given volume in terms of resource consumption? - GetVolumeSize(context.Context, models.InputSource) (uint64, error) + GetVolumeSize(context.Context, *models.Execution, models.InputSource) (uint64, error) // PrepareStorage is provided an output directory, and an InputSource and // is expected to retrieve the InputSource into the output directory. diff --git a/pkg/storage/url/urldownload/storage.go b/pkg/storage/url/urldownload/storage.go index b214178d47..2d3558ffd7 100644 --- a/pkg/storage/url/urldownload/storage.go +++ b/pkg/storage/url/urldownload/storage.go @@ -80,7 +80,7 @@ func (sp *StorageProvider) HasStorageLocally(context.Context, models.InputSource return false, nil } -func (sp *StorageProvider) GetVolumeSize(ctx context.Context, storageSpec models.InputSource) (uint64, error) { +func (sp *StorageProvider) GetVolumeSize(ctx context.Context, _ *models.Execution, storageSpec models.InputSource) (uint64, error) { source, err := DecodeSpec(storageSpec.Source) if err != nil { return 0, err diff --git a/pkg/storage/url/urldownload/storage_test.go b/pkg/storage/url/urldownload/storage_test.go index c365d27f45..2843a27680 100644 --- a/pkg/storage/url/urldownload/storage_test.go +++ b/pkg/storage/url/urldownload/storage_test.go @@ -387,7 +387,7 @@ func (s *StorageSuite) TestGetVolumeSize_WithServerReturningValidSize() { Target: "/inputs", } - vs, err := subject.GetVolumeSize(context.Background(), spec) + vs, err := subject.GetVolumeSize(context.Background(), mock.Execution(), spec) s.Require().NoError(err) s.Equal(uint64(500), vs, "content-length does not match") @@ -428,7 +428,7 @@ func (s *StorageSuite) TestGetVolumeSize_WithServerReturningInvalidSize() { Target: "/inputs", } - _, err := subject.GetVolumeSize(context.Background(), spec) + _, err := subject.GetVolumeSize(context.Background(), mock.Execution(), spec) s.Require().ErrorIs(err, ErrNoContentLengthFound) }