-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose execution to storage providers #4803
Conversation
Caution Review failedThe pull request is closed. WalkthroughThe pull request introduces a significant refactoring of storage preparation methods across multiple packages. The primary change involves modifying the Changes
Assessment against linked issues
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
pkg/storage/noop/noop.go (1)
Line range hint
13-14
: Update handler type to match new signature.The
StorageHandlerPrepareStorage
type definition doesn't match the newPrepareStorage
method signature. This mismatch could cause runtime issues.Update the type definition:
type StorageHandlerPrepareStorage func( - ctx context.Context, storageDir string, storageSpec models.InputSource) (storage.StorageVolume, error) + ctx context.Context, storageDir string, execution *models.Execution, input models.InputSource) (storage.StorageVolume, error)
🧹 Nitpick comments (11)
pkg/s3/test/suite.go (2)
Line range hint
199-209
: Handle potential errors when creating mock executionThe
MockExecution
method creates a mock execution for testing purposes. While the current implementation uses predictable IDs forJobID
andExecutionID
, consider handling potential errors that might arise during mock object creation to enhance test robustness.Apply this diff to handle errors:
func (s *HelperSuite) MockExecution(publisherConfig s3helper.PublisherSpec) *models.Execution { job := mock.Job() job.ID = s.JobID // to get predictable published key job.Task().Publisher = &models.SpecConfig{ Type: models.PublisherS3, Params: publisherConfig.ToMap(), } + if job.Task().Publisher == nil { + s.T().Fatal("Failed to set publisher in mock job") + } execution := mock.ExecutionForJob(job) execution.ID = s.ExecutionID // to get predictable published key return execution }
217-219
: Improve error handling inPublishResultSilently
In the
PublishResultSilently
method, the current error handling skips the test only for an "AccessDenied" error. Consider logging or handling other errors to aid in debugging and to prevent silent failures in tests.Apply this diff to enhance error handling:
func (s *HelperSuite) PublishResultSilently(execution *models.Execution, resultPath string) models.SpecConfig { // publish result to S3 storageSpec, err := s.PublishResult(execution, resultPath) if err != nil { var ae smithy.APIError if errors.As(err, &ae) && ae.ErrorCode() == "AccessDenied" { s.T().Skip("No access to S3 bucket " + s.Bucket) + } else { + s.T().Fatalf("Failed to publish result: %v", err) } } s.Require().NoError(err) return storageSpec }pkg/storage/tracing/tracing.go (1)
56-66
: Consider enhancing metrics with execution context.While the changes are correct, consider enriching the metrics with execution context information for better observability. This could help track storage preparation across different execution types or contexts.
- stopwatch := telemetry.Timer(ctx, jobStoragePrepareDurationMilliseconds, input.Source.MetricAttributes()...) + attributes := append( + input.Source.MetricAttributes(), + attribute.String("execution_id", execution.ID), + attribute.String("execution_type", execution.Type), + ) + stopwatch := telemetry.Timer(ctx, jobStoragePrepareDurationMilliseconds, attributes...)pkg/storage/local_directory/storage.go (1)
71-72
: LGTM! Consider documenting the execution parameter.The changes look good. The parameter rename from
storageSpec
toinput
improves clarity. However, since the execution parameter is currently unused (marked with_
), it would be helpful to document its intended future use.Add a comment explaining the purpose of the execution parameter:
func (driver *StorageProvider) PrepareStorage( _ context.Context, _ string, + // execution provides context about the job being executed, which may be used + // in future implementations for enhanced storage preparation _ *models.Execution, input models.InputSource, ) (storage.StorageVolume, error) {Also applies to: 74-74, 85-85
pkg/storage/ipfs/storage.go (1)
81-82
: LGTM! Consider documenting the execution parameter.The changes are consistent with other storage providers. The error messages have been correctly updated to reflect the new parameter name.
Add a comment explaining the purpose of the execution parameter:
func (s *StorageProvider) PrepareStorage( ctx context.Context, storageDirectory string, + // execution provides context about the job being executed, which may be used + // in future implementations for enhanced storage preparation _ *models.Execution, input models.InputSource) (storage.StorageVolume, error) {Also applies to: 83-83, 97-97, 99-99
pkg/storage/inline/storage.go (1)
98-101
: LGTM! Method signature updated to support execution context.The change aligns with the PR objective of exposing execution to storage providers, while maintaining backward compatibility by ignoring the unused execution parameter.
Consider documenting the intended future use of the execution parameter to help other developers understand its purpose.
pkg/storage/s3/storage.go (1)
109-112
: LGTM! Method signature updated consistently.The change aligns with other storage providers and the PR objective of exposing execution context.
Consider documenting how the execution context will be used in future implementations, particularly for S3-specific features like partitioned inputs.
pkg/storage/url/urldownload/storage.go (2)
123-124
: Consider utilizing the execution context.The
execution
parameter is currently unused. Consider leveraging it to:
- Implement rate limiting based on execution priority
- Track download progress per execution
- Add execution metadata to HTTP headers for better tracing
201-204
: Enhance logging with execution context.Add execution metadata to the log entry for better observability.
log.Ctx(ctx).Debug(). + Str("execution_id", execution.ID). + Str("job_id", execution.Job.ID). Stringer("url", u). Stringer("final-url", res.Request.URL). Str("file", filePath). Str("targetFile", targetPath). Msg("Downloaded file")pkg/storage/url/urldownload/storage_test.go (1)
324-324
: Add test cases for execution-specific behaviors.Consider adding test cases to verify:
- Rate limiting based on execution priority
- Download progress tracking per execution
- Proper handling of execution metadata in logs
pkg/compute/executor.go (1)
87-94
: Consider consolidating duplicate storage preparation calls.The code makes two separate calls to
ParallelPrepareStorage
with similar parameters. Consider consolidating these calls to reduce code duplication.- importModuleVolumes, err := storage.ParallelPrepareStorage( - ctx, storageProvider, storageDirectory, execution, wasmEngine.ImportModules...) - if err != nil { - return nil, nil, err - } - - entryModuleVolumes, err := storage.ParallelPrepareStorage( - ctx, storageProvider, storageDirectory, execution, wasmEngine.EntryModule) + allModules := append([]models.InputSource{wasmEngine.EntryModule}, wasmEngine.ImportModules...) + allVolumes, err := storage.ParallelPrepareStorage( + ctx, storageProvider, storageDirectory, execution, allModules...) if err != nil { return nil, nil, err } + + importModuleVolumes := allVolumes[:len(wasmEngine.ImportModules)] + entryModuleVolumes := allVolumes[len(wasmEngine.ImportModules):]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
pkg/compute/executor.go
(4 hunks)pkg/publisher/s3/publisher_test.go
(2 hunks)pkg/s3/test/suite.go
(3 hunks)pkg/storage/inline/storage.go
(3 hunks)pkg/storage/inline/storage_test.go
(3 hunks)pkg/storage/ipfs/storage.go
(2 hunks)pkg/storage/ipfs/storage_test.go
(2 hunks)pkg/storage/local_directory/storage.go
(2 hunks)pkg/storage/local_directory/storage_test.go
(2 hunks)pkg/storage/noop/noop.go
(1 hunks)pkg/storage/parallel.go
(4 hunks)pkg/storage/parallel_test.go
(3 hunks)pkg/storage/s3/storage.go
(2 hunks)pkg/storage/s3/storage_test.go
(3 hunks)pkg/storage/tracing/tracing.go
(1 hunks)pkg/storage/types.go
(1 hunks)pkg/storage/url/urldownload/storage.go
(2 hunks)pkg/storage/url/urldownload/storage_test.go
(2 hunks)
🔇 Additional comments (21)
pkg/storage/parallel.go (2)
48-50
: Ensureexecution.Job
andexecution.Job.Task()
are not nilWhen defaulting
inputs
toexecution.Job.Task().InputSources
, there's an implicit assumption that bothexecution.Job
andexecution.Job.Task()
are not nil. To prevent unexpected panics, verify that these fields are properly initialized before accessing them.
36-36
: Confirm all storage providers handle the newexecution
parameterUpdating the call to
storageProvider.PrepareStorage
to includeexecution
aligns with the updated method signature. Ensure that all storage providers correctly handle the newexecution
parameter and that any necessary adjustments are made within their implementations.pkg/s3/test/suite.go (3)
212-212
: UpdatePublishResult
to acceptexecution
parameterThe
PublishResult
method signature now acceptsexecution *models.Execution
, which is consistent with the updated publishing process. This change ensures that the execution context is passed correctly during result publishing.
234-235
: Passexecution
correctly inPrepareAndPublish
By obtaining
execution
froms.MockExecution(publisherConfig)
and passing it toPublishResultSilently
, you ensure that the execution context used during publishing matches the mock execution created for the test.
240-244
: Verify proper use ofexecution
inGetResult
The
GetResult
method now includesexecution
when callings.Storage.PrepareStorage
. Confirm that theexecution
object is correctly utilized within the storage provider and that any required fields are properly initialized to prevent issues during storage preparation.pkg/storage/types.go (1)
23-26
:⚠️ Potential issueUpdate
Storage
interface implementations with newexecution
parameterThe
PrepareStorage
method in theStorage
interface now includes anexecution *models.Execution
parameter. Ensure that all implementations of this interface are updated accordingly to handle the new parameter. This could involve utilizing theexecution
context within storage preparation logic or making necessary adjustments to comply with the new method signature.pkg/storage/inline/storage_test.go (2)
13-13
: LGTM! Import added for mock package.The mock package import is necessary for providing execution context in tests.
35-35
: LGTM! PrepareStorage calls updated with mock execution context.The test functions have been correctly updated to pass mock execution context to PrepareStorage calls, maintaining test coverage while accommodating the new parameter.
Also applies to: 62-62
pkg/storage/tracing/tracing.go (1)
51-52
: LGTM! PrepareStorage signature updated.The method signature has been correctly updated to include execution context, maintaining consistency with other storage providers.
pkg/storage/ipfs/storage_test.go (2)
17-17
: LGTM! Import added for mock package.The mock package import is necessary for providing execution context in tests.
83-83
: LGTM! PrepareStorage call updated with mock execution context.The timeout test has been correctly updated to pass mock execution context while maintaining its original timeout verification logic.
pkg/storage/parallel_test.go (2)
23-23
: LGTM! Import added for mock package.The mock package import is necessary for providing execution context in tests.
65-65
: LGTM! ParallelPrepareStorage calls updated with mock execution context.Both test functions have been correctly updated to pass mock execution context while maintaining their cleanup verification logic.
Also applies to: 102-102
pkg/publisher/s3/publisher_test.go (1)
202-203
: LGTM! Test changes correctly reflect new execution-aware approach.The test has been properly updated to use the execution context instead of direct parameter usage, maintaining test coverage while adapting to the new interface.
Also applies to: 228-228
pkg/storage/inline/storage.go (1)
127-127
: LGTM! Consistent parameter naming.The change from
spec.Target
toinput.Target
improves code consistency with the new parameter naming.Also applies to: 140-140
pkg/storage/s3/storage_test.go (2)
239-239
: LGTM! Test updated to include mock execution context.The test correctly uses
mock.Execution()
to provide the new required parameter.
277-277
: LGTM! Error case test updated.The error case test is properly updated to include the mock execution parameter.
pkg/storage/local_directory/storage_test.go (1)
240-240
: LGTM! Test updated consistently with other storage providers.The test follows the same pattern of using
mock.Execution()
as seen in other storage provider tests.pkg/storage/s3/storage.go (1)
145-145
: LGTM! Consistent parameter naming.The change from
storageSpec.Target
toinput.Target
maintains consistency with the new parameter naming convention.pkg/compute/executor.go (2)
66-70
: Clean and focused refactor.The changes maintain a clear separation of concerns while properly propagating the execution context.
82-84
: Consistent parameter ordering.The parameter ordering is consistent with other functions in the codebase, maintaining good readability.
execution *models.Execution, | ||
inputs ...*models.InputSource, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate execution
before use to prevent nil pointer dereference
The function ParallelPrepareStorage
now accepts execution *models.Execution
as a parameter. In the subsequent code, execution
is used without a nil check (e.g., line 49: execution.Job.Task().InputSources
). To prevent potential nil pointer dereference errors, ensure that execution
is not nil before accessing its fields.
Apply this diff to add a nil check:
if len(inputs) == 0 {
+ if execution == nil {
+ return nil, errors.New("execution is nil")
+ }
inputs = execution.Job.Task().InputSources
}
Committable suggestion skipped: line range outside the PR's diff.
execution *models.Execution, | ||
input models.InputSource) (storage.StorageVolume, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix handler invocation to match new signature.
The handler invocation needs to be updated to pass all parameters correctly.
Update the handler invocation:
if s.Config.ExternalHooks.PrepareStorage != nil {
handler := s.Config.ExternalHooks.PrepareStorage
- return handler(ctx, storageDir, input)
+ return handler(ctx, storageDir, execution, input)
}
Also applies to: 87-87
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
pkg/test/ipfs/ipfs_host_storage_test.go (1)
94-98
: Consider adding test cases for execution context usage.While the tests verify basic storage preparation functionality, consider adding test cases that specifically validate how the execution context is used by the storage provider. For example:
- Test with different execution configurations
- Verify error cases related to execution context
- Test any execution-specific storage provider behaviors
This would help ensure the robustness of the new execution context feature.
Also applies to: 135-139
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/test/ipfs/ipfs_host_storage_test.go
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: testcontainers-suite / tests
🔇 Additional comments (2)
pkg/test/ipfs/ipfs_host_storage_test.go (2)
22-22
: LGTM!The mock package import is correctly added to support the new execution parameter in storage preparation tests.
96-96
: Verify mock execution data completeness.Both test methods now use
mock.Execution()
to provide execution context to storage providers. Ensure that the mock execution data includes all necessary fields that storage providers might need for the partitioned inputs use case (ENG-520).Let's verify the mock execution implementation:
Also applies to: 137-137
✅ Verification successful
Mock execution implementation is complete and suitable
The mock.Execution() implementation provides a comprehensive execution context with all necessary fields including identifiers, state tracking, and job references. The implementation also ensures data integrity through normalization and validation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check the mock execution implementation to ensure it provides necessary fields. ast-grep --pattern 'func Execution() *models.Execution { $$$ }'Length of output: 218
Script:
#!/bin/bash # Check the ExecutionForJob and Job implementations ast-grep --pattern 'func ExecutionForJob($_) *models.Execution { $$$ }' ast-grep --pattern 'func Job() *models.Job { $$$ }'Length of output: 2314
This PR makes execution information available to storage providers in preparation for enabling partitioned inputs https://linear.app/expanso/issue/ENG-520/partitioned-s3-input-source
Summary by CodeRabbit
Release Notes
Refactor
Testing
These changes represent a significant architectural refinement in how storage and execution contexts are managed throughout the system, focusing on more robust and context-aware storage preparation mechanisms.