Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose execution to storage providers #4803

Merged
merged 5 commits into from
Jan 10, 2025

Conversation

wdbaruni
Copy link
Member

@wdbaruni wdbaruni commented Jan 9, 2025

This PR makes execution information available to storage providers in preparation for enabling partitioned inputs https://linear.app/expanso/issue/ENG-520/partitioned-s3-input-source

Summary by CodeRabbit

Release Notes

  • Refactor

    • Updated storage preparation methods across multiple packages to include execution context.
    • Modified method signatures to support more comprehensive input handling.
    • Enhanced flexibility in the storage preparation process.
  • Testing

    • Updated test suites to incorporate mock execution contexts.
    • Improved test coverage for storage-related functionality.

These changes represent a significant architectural refinement in how storage and execution contexts are managed throughout the system, focusing on more robust and context-aware storage preparation mechanisms.

Copy link

linear bot commented Jan 9, 2025

Copy link
Contributor

coderabbitai bot commented Jan 9, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

The pull request introduces a significant refactoring of storage preparation methods across multiple packages. The primary change involves modifying the PrepareStorage method signatures to include an additional *models.Execution parameter. This modification affects storage providers for various sources like inline, IPFS, local directory, S3, URL downloads, and others. The changes aim to provide more context during storage preparation by passing the entire execution context alongside input sources.

Changes

File Change Summary
pkg/compute/executor.go Updated function signatures for prepareInputVolumes, prepareWasmVolumes, and PrepareRunArguments to accept *models.Execution
pkg/storage/*/storage.go Modified PrepareStorage method signatures across multiple storage providers to include *models.Execution
pkg/storage/parallel.go Updated ParallelPrepareStorage to accept *models.Execution and derive input sources from execution context
pkg/storage/types.go Updated Storage interface method signature
pkg/publisher/s3/publisher_test.go Updated TestPublish to utilize execution context in publishing operations
pkg/s3/test/suite.go Introduced MockExecution method and updated PublishResult methods to accept execution
pkg/storage/*/storage_test.go Modified test methods to include mock.Execution() in PrepareStorage calls

Assessment against linked issues

Objective Addressed Explanation
Add support for partitioned input in S3 (ENG-520) Structural changes suggest preparation for partitioned input, but specific implementation details are not clear

Possibly related PRs

Poem

🐰 Hop, hop, through code's domain,
Execution context, a new refrain!
Parameters dance, signatures shift,
Storage providers get a coding gift.
Refactoring magic, rabbit's delight! 🚀


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fd4159d and 1f035a8.

📒 Files selected for processing (1)
  • pkg/downloader/download_test.go (1 hunks)

Finishing Touches

  • 📝 Generate Docstrings (Beta)

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
pkg/storage/noop/noop.go (1)

Line range hint 13-14: Update handler type to match new signature.

The StorageHandlerPrepareStorage type definition doesn't match the new PrepareStorage method signature. This mismatch could cause runtime issues.

Update the type definition:

 type StorageHandlerPrepareStorage func(
-	ctx context.Context, storageDir string, storageSpec models.InputSource) (storage.StorageVolume, error)
+	ctx context.Context, storageDir string, execution *models.Execution, input models.InputSource) (storage.StorageVolume, error)
🧹 Nitpick comments (11)
pkg/s3/test/suite.go (2)

Line range hint 199-209: Handle potential errors when creating mock execution

The MockExecution method creates a mock execution for testing purposes. While the current implementation uses predictable IDs for JobID and ExecutionID, consider handling potential errors that might arise during mock object creation to enhance test robustness.

Apply this diff to handle errors:

 func (s *HelperSuite) MockExecution(publisherConfig s3helper.PublisherSpec) *models.Execution {
     job := mock.Job()
     job.ID = s.JobID // to get predictable published key
     job.Task().Publisher = &models.SpecConfig{
         Type:   models.PublisherS3,
         Params: publisherConfig.ToMap(),
     }
+    if job.Task().Publisher == nil {
+        s.T().Fatal("Failed to set publisher in mock job")
+    }
     execution := mock.ExecutionForJob(job)
     execution.ID = s.ExecutionID // to get predictable published key
     return execution
 }

217-219: Improve error handling in PublishResultSilently

In the PublishResultSilently method, the current error handling skips the test only for an "AccessDenied" error. Consider logging or handling other errors to aid in debugging and to prevent silent failures in tests.

Apply this diff to enhance error handling:

 func (s *HelperSuite) PublishResultSilently(execution *models.Execution, resultPath string) models.SpecConfig {
     // publish result to S3
     storageSpec, err := s.PublishResult(execution, resultPath)
     if err != nil {
         var ae smithy.APIError
         if errors.As(err, &ae) && ae.ErrorCode() == "AccessDenied" {
             s.T().Skip("No access to S3 bucket " + s.Bucket)
+        } else {
+            s.T().Fatalf("Failed to publish result: %v", err)
         }
     }
     s.Require().NoError(err)
     return storageSpec
 }
pkg/storage/tracing/tracing.go (1)

56-66: Consider enhancing metrics with execution context.

While the changes are correct, consider enriching the metrics with execution context information for better observability. This could help track storage preparation across different execution types or contexts.

-	stopwatch := telemetry.Timer(ctx, jobStoragePrepareDurationMilliseconds, input.Source.MetricAttributes()...)
+	attributes := append(
+		input.Source.MetricAttributes(),
+		attribute.String("execution_id", execution.ID),
+		attribute.String("execution_type", execution.Type),
+	)
+	stopwatch := telemetry.Timer(ctx, jobStoragePrepareDurationMilliseconds, attributes...)
pkg/storage/local_directory/storage.go (1)

71-72: LGTM! Consider documenting the execution parameter.

The changes look good. The parameter rename from storageSpec to input improves clarity. However, since the execution parameter is currently unused (marked with _), it would be helpful to document its intended future use.

Add a comment explaining the purpose of the execution parameter:

 func (driver *StorageProvider) PrepareStorage(
 	_ context.Context,
 	_ string,
+	// execution provides context about the job being executed, which may be used
+	// in future implementations for enhanced storage preparation
 	_ *models.Execution,
 	input models.InputSource,
 ) (storage.StorageVolume, error) {

Also applies to: 74-74, 85-85

pkg/storage/ipfs/storage.go (1)

81-82: LGTM! Consider documenting the execution parameter.

The changes are consistent with other storage providers. The error messages have been correctly updated to reflect the new parameter name.

Add a comment explaining the purpose of the execution parameter:

 func (s *StorageProvider) PrepareStorage(
 	ctx context.Context,
 	storageDirectory string,
+	// execution provides context about the job being executed, which may be used
+	// in future implementations for enhanced storage preparation
 	_ *models.Execution,
 	input models.InputSource) (storage.StorageVolume, error) {

Also applies to: 83-83, 97-97, 99-99

pkg/storage/inline/storage.go (1)

98-101: LGTM! Method signature updated to support execution context.

The change aligns with the PR objective of exposing execution to storage providers, while maintaining backward compatibility by ignoring the unused execution parameter.

Consider documenting the intended future use of the execution parameter to help other developers understand its purpose.

pkg/storage/s3/storage.go (1)

109-112: LGTM! Method signature updated consistently.

The change aligns with other storage providers and the PR objective of exposing execution context.

Consider documenting how the execution context will be used in future implementations, particularly for S3-specific features like partitioned inputs.

pkg/storage/url/urldownload/storage.go (2)

123-124: Consider utilizing the execution context.

The execution parameter is currently unused. Consider leveraging it to:

  • Implement rate limiting based on execution priority
  • Track download progress per execution
  • Add execution metadata to HTTP headers for better tracing

201-204: Enhance logging with execution context.

Add execution metadata to the log entry for better observability.

 log.Ctx(ctx).Debug().
+        Str("execution_id", execution.ID).
+        Str("job_id", execution.Job.ID).
         Stringer("url", u).
         Stringer("final-url", res.Request.URL).
         Str("file", filePath).
         Str("targetFile", targetPath).
         Msg("Downloaded file")
pkg/storage/url/urldownload/storage_test.go (1)

324-324: Add test cases for execution-specific behaviors.

Consider adding test cases to verify:

  • Rate limiting based on execution priority
  • Download progress tracking per execution
  • Proper handling of execution metadata in logs
pkg/compute/executor.go (1)

87-94: Consider consolidating duplicate storage preparation calls.

The code makes two separate calls to ParallelPrepareStorage with similar parameters. Consider consolidating these calls to reduce code duplication.

-	importModuleVolumes, err := storage.ParallelPrepareStorage(
-		ctx, storageProvider, storageDirectory, execution, wasmEngine.ImportModules...)
-	if err != nil {
-		return nil, nil, err
-	}
-
-	entryModuleVolumes, err := storage.ParallelPrepareStorage(
-		ctx, storageProvider, storageDirectory, execution, wasmEngine.EntryModule)
+	allModules := append([]models.InputSource{wasmEngine.EntryModule}, wasmEngine.ImportModules...)
+	allVolumes, err := storage.ParallelPrepareStorage(
+		ctx, storageProvider, storageDirectory, execution, allModules...)
 	if err != nil {
 		return nil, nil, err
 	}
+
+	importModuleVolumes := allVolumes[:len(wasmEngine.ImportModules)]
+	entryModuleVolumes := allVolumes[len(wasmEngine.ImportModules):]
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3acac95 and d25260a.

📒 Files selected for processing (18)
  • pkg/compute/executor.go (4 hunks)
  • pkg/publisher/s3/publisher_test.go (2 hunks)
  • pkg/s3/test/suite.go (3 hunks)
  • pkg/storage/inline/storage.go (3 hunks)
  • pkg/storage/inline/storage_test.go (3 hunks)
  • pkg/storage/ipfs/storage.go (2 hunks)
  • pkg/storage/ipfs/storage_test.go (2 hunks)
  • pkg/storage/local_directory/storage.go (2 hunks)
  • pkg/storage/local_directory/storage_test.go (2 hunks)
  • pkg/storage/noop/noop.go (1 hunks)
  • pkg/storage/parallel.go (4 hunks)
  • pkg/storage/parallel_test.go (3 hunks)
  • pkg/storage/s3/storage.go (2 hunks)
  • pkg/storage/s3/storage_test.go (3 hunks)
  • pkg/storage/tracing/tracing.go (1 hunks)
  • pkg/storage/types.go (1 hunks)
  • pkg/storage/url/urldownload/storage.go (2 hunks)
  • pkg/storage/url/urldownload/storage_test.go (2 hunks)
🔇 Additional comments (21)
pkg/storage/parallel.go (2)

48-50: Ensure execution.Job and execution.Job.Task() are not nil

When defaulting inputs to execution.Job.Task().InputSources, there's an implicit assumption that both execution.Job and execution.Job.Task() are not nil. To prevent unexpected panics, verify that these fields are properly initialized before accessing them.


36-36: Confirm all storage providers handle the new execution parameter

Updating the call to storageProvider.PrepareStorage to include execution aligns with the updated method signature. Ensure that all storage providers correctly handle the new execution parameter and that any necessary adjustments are made within their implementations.

pkg/s3/test/suite.go (3)

212-212: Update PublishResult to accept execution parameter

The PublishResult method signature now accepts execution *models.Execution, which is consistent with the updated publishing process. This change ensures that the execution context is passed correctly during result publishing.


234-235: Pass execution correctly in PrepareAndPublish

By obtaining execution from s.MockExecution(publisherConfig) and passing it to PublishResultSilently, you ensure that the execution context used during publishing matches the mock execution created for the test.


240-244: Verify proper use of execution in GetResult

The GetResult method now includes execution when calling s.Storage.PrepareStorage. Confirm that the execution object is correctly utilized within the storage provider and that any required fields are properly initialized to prevent issues during storage preparation.

pkg/storage/types.go (1)

23-26: ⚠️ Potential issue

Update Storage interface implementations with new execution parameter

The PrepareStorage method in the Storage interface now includes an execution *models.Execution parameter. Ensure that all implementations of this interface are updated accordingly to handle the new parameter. This could involve utilizing the execution context within storage preparation logic or making necessary adjustments to comply with the new method signature.

pkg/storage/inline/storage_test.go (2)

13-13: LGTM! Import added for mock package.

The mock package import is necessary for providing execution context in tests.


35-35: LGTM! PrepareStorage calls updated with mock execution context.

The test functions have been correctly updated to pass mock execution context to PrepareStorage calls, maintaining test coverage while accommodating the new parameter.

Also applies to: 62-62

pkg/storage/tracing/tracing.go (1)

51-52: LGTM! PrepareStorage signature updated.

The method signature has been correctly updated to include execution context, maintaining consistency with other storage providers.

pkg/storage/ipfs/storage_test.go (2)

17-17: LGTM! Import added for mock package.

The mock package import is necessary for providing execution context in tests.


83-83: LGTM! PrepareStorage call updated with mock execution context.

The timeout test has been correctly updated to pass mock execution context while maintaining its original timeout verification logic.

pkg/storage/parallel_test.go (2)

23-23: LGTM! Import added for mock package.

The mock package import is necessary for providing execution context in tests.


65-65: LGTM! ParallelPrepareStorage calls updated with mock execution context.

Both test functions have been correctly updated to pass mock execution context while maintaining their cleanup verification logic.

Also applies to: 102-102

pkg/publisher/s3/publisher_test.go (1)

202-203: LGTM! Test changes correctly reflect new execution-aware approach.

The test has been properly updated to use the execution context instead of direct parameter usage, maintaining test coverage while adapting to the new interface.

Also applies to: 228-228

pkg/storage/inline/storage.go (1)

127-127: LGTM! Consistent parameter naming.

The change from spec.Target to input.Target improves code consistency with the new parameter naming.

Also applies to: 140-140

pkg/storage/s3/storage_test.go (2)

239-239: LGTM! Test updated to include mock execution context.

The test correctly uses mock.Execution() to provide the new required parameter.


277-277: LGTM! Error case test updated.

The error case test is properly updated to include the mock execution parameter.

pkg/storage/local_directory/storage_test.go (1)

240-240: LGTM! Test updated consistently with other storage providers.

The test follows the same pattern of using mock.Execution() as seen in other storage provider tests.

pkg/storage/s3/storage.go (1)

145-145: LGTM! Consistent parameter naming.

The change from storageSpec.Target to input.Target maintains consistency with the new parameter naming convention.

pkg/compute/executor.go (2)

66-70: Clean and focused refactor.

The changes maintain a clear separation of concerns while properly propagating the execution context.


82-84: Consistent parameter ordering.

The parameter ordering is consistent with other functions in the codebase, maintaining good readability.

Comment on lines +25 to +26
execution *models.Execution,
inputs ...*models.InputSource,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate execution before use to prevent nil pointer dereference

The function ParallelPrepareStorage now accepts execution *models.Execution as a parameter. In the subsequent code, execution is used without a nil check (e.g., line 49: execution.Job.Task().InputSources). To prevent potential nil pointer dereference errors, ensure that execution is not nil before accessing its fields.

Apply this diff to add a nil check:

 if len(inputs) == 0 {
+    if execution == nil {
+        return nil, errors.New("execution is nil")
+    }
     inputs = execution.Job.Task().InputSources
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +83 to +84
execution *models.Execution,
input models.InputSource) (storage.StorageVolume, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix handler invocation to match new signature.

The handler invocation needs to be updated to pass all parameters correctly.

Update the handler invocation:

 	if s.Config.ExternalHooks.PrepareStorage != nil {
 		handler := s.Config.ExternalHooks.PrepareStorage
-		return handler(ctx, storageDir, input)
+		return handler(ctx, storageDir, execution, input)
 	}

Also applies to: 87-87

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
pkg/test/ipfs/ipfs_host_storage_test.go (1)

94-98: Consider adding test cases for execution context usage.

While the tests verify basic storage preparation functionality, consider adding test cases that specifically validate how the execution context is used by the storage provider. For example:

  • Test with different execution configurations
  • Verify error cases related to execution context
  • Test any execution-specific storage provider behaviors

This would help ensure the robustness of the new execution context feature.

Also applies to: 135-139

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a7158d2 and fd4159d.

📒 Files selected for processing (1)
  • pkg/test/ipfs/ipfs_host_storage_test.go (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: testcontainers-suite / tests
🔇 Additional comments (2)
pkg/test/ipfs/ipfs_host_storage_test.go (2)

22-22: LGTM!

The mock package import is correctly added to support the new execution parameter in storage preparation tests.


96-96: Verify mock execution data completeness.

Both test methods now use mock.Execution() to provide execution context to storage providers. Ensure that the mock execution data includes all necessary fields that storage providers might need for the partitioned inputs use case (ENG-520).

Let's verify the mock execution implementation:

Also applies to: 137-137

✅ Verification successful

Mock execution implementation is complete and suitable

The mock.Execution() implementation provides a comprehensive execution context with all necessary fields including identifiers, state tracking, and job references. The implementation also ensures data integrity through normalization and validation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check the mock execution implementation to ensure it provides necessary fields.
ast-grep --pattern 'func Execution() *models.Execution {
  $$$
}'

Length of output: 218


Script:

#!/bin/bash
# Check the ExecutionForJob and Job implementations
ast-grep --pattern 'func ExecutionForJob($_) *models.Execution {
  $$$
}'

ast-grep --pattern 'func Job() *models.Job {
  $$$
}'

Length of output: 2314

@wdbaruni wdbaruni merged commit 00b8981 into main Jan 10, 2025
13 of 14 checks passed
@wdbaruni wdbaruni deleted the eng-529-expose-execution-to-storage-providers branch January 10, 2025 18:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant