From 4c273dc2263c02a8a59ddb6ebc65d00eed2630de Mon Sep 17 00:00:00 2001 From: Alessandro Liparoti Date: Mon, 23 Oct 2023 06:09:52 -0700 Subject: [PATCH] Back out "Back out "[buck] upload Action to CAS when uploading ActionResult to Action Cache"" Summary: Fixes D50501135 which was breaking due to some land conflicts. Rebasing and testing latest version fixes it Original commit changeset: 365209684319 Original Phabricator Diff: D50501135 Reviewed By: stepancheg Differential Revision: D50502202 fbshipit-source-id: aa683f3856bfadeb3c03646d22e8034d3977db8c --- .../src/actions/impls/run/mod.rs | 7 ++++- .../src/actions/execute/action_executor.rs | 3 ++ app/buck2_build_api/src/actions/mod.rs | 2 ++ app/buck2_execute/src/execute/blobs.rs | 6 ++++ .../src/execute/cache_uploader.rs | 3 ++ .../src/execute/command_executor.rs | 3 +- .../src/executors/caching.rs | 31 +++++++++++++++++++ 7 files changed, 53 insertions(+), 2 deletions(-) diff --git a/app/buck2_action_impl/src/actions/impls/run/mod.rs b/app/buck2_action_impl/src/actions/impls/run/mod.rs index 919aba80ecb9..e2e2b3b9c563 100644 --- a/app/buck2_action_impl/src/actions/impls/run/mod.rs +++ b/app/buck2_action_impl/src/actions/impls/run/mod.rs @@ -721,7 +721,12 @@ impl IncrementalActionExecutable for RunAction { _ => None, }; let upload_result = ctx - .cache_upload(prepared_action.action.dupe(), &result, dep_file_entry) + .cache_upload( + prepared_action.action.dupe(), + &result, + dep_file_entry, + &prepared_action.blobs, + ) .await?; result.did_cache_upload = upload_result.did_cache_upload; diff --git a/app/buck2_build_api/src/actions/execute/action_executor.rs b/app/buck2_build_api/src/actions/execute/action_executor.rs index 633fbdc0dd8e..44894def6c1d 100644 --- a/app/buck2_build_api/src/actions/execute/action_executor.rs +++ b/app/buck2_build_api/src/actions/execute/action_executor.rs @@ -28,6 +28,7 @@ use buck2_execute::artifact_value::ArtifactValue; use buck2_execute::digest_config::DigestConfig; use buck2_execute::digest_config::HasDigestConfig; use buck2_execute::execute::action_digest::ActionDigest; +use buck2_execute::execute::blobs::ActionBlobs; use buck2_execute::execute::blocking::BlockingExecutor; use buck2_execute::execute::blocking::HasBlockingExecutor; use buck2_execute::execute::cache_uploader::CacheUploadInfo; @@ -498,6 +499,7 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { action_digest: ActionDigest, execution_result: &CommandExecutionResult, dep_file_entry: Option, + action_blobs: &ActionBlobs, ) -> anyhow::Result { let action = self.target(); self.executor @@ -510,6 +512,7 @@ impl ActionExecutionCtx for BuckActionExecutionContext<'_> { }, execution_result, dep_file_entry, + action_blobs, ) .await } diff --git a/app/buck2_build_api/src/actions/mod.rs b/app/buck2_build_api/src/actions/mod.rs index b614e78719b6..b24305f71f16 100644 --- a/app/buck2_build_api/src/actions/mod.rs +++ b/app/buck2_build_api/src/actions/mod.rs @@ -54,6 +54,7 @@ use buck2_events::dispatch::EventDispatcher; use buck2_execute::artifact::fs::ExecutorFs; use buck2_execute::digest_config::DigestConfig; use buck2_execute::execute::action_digest::ActionDigest; +use buck2_execute::execute::blobs::ActionBlobs; use buck2_execute::execute::blocking::BlockingExecutor; use buck2_execute::execute::cache_uploader::CacheUploadResult; use buck2_execute::execute::cache_uploader::DepFileEntry; @@ -218,6 +219,7 @@ pub trait ActionExecutionCtx: Send + Sync { action_digest: ActionDigest, execution_result: &CommandExecutionResult, dep_file_entry: Option, + action_blobs: &ActionBlobs, ) -> anyhow::Result; /// Executes a command diff --git a/app/buck2_execute/src/execute/blobs.rs b/app/buck2_execute/src/execute/blobs.rs index 32fb71607478..e66581551f43 100644 --- a/app/buck2_execute/src/execute/blobs.rs +++ b/app/buck2_execute/src/execute/blobs.rs @@ -55,4 +55,10 @@ impl ActionBlobs { pub fn get(&self, digest: &TrackedFileDigest) -> Option<&ActionMetadataBlobData> { self.0.get(digest) } + + pub fn iter( + &self, + ) -> std::collections::hash_map::Iter<'_, TrackedFileDigest, ActionMetadataBlobData> { + self.0.iter() + } } diff --git a/app/buck2_execute/src/execute/cache_uploader.rs b/app/buck2_execute/src/execute/cache_uploader.rs index df2672d4398b..60f8ee01898d 100644 --- a/app/buck2_execute/src/execute/cache_uploader.rs +++ b/app/buck2_execute/src/execute/cache_uploader.rs @@ -12,6 +12,7 @@ use buck2_action_metadata_proto::RemoteDepFile; use crate::digest_config::DigestConfig; use crate::execute::action_digest::ActionDigest; +use crate::execute::blobs::ActionBlobs; use crate::execute::dep_file_digest::DepFileDigest; use crate::execute::result::CommandExecutionResult; use crate::execute::target::CommandExecutionTarget; @@ -43,6 +44,7 @@ pub trait UploadCache: Send + Sync { info: &CacheUploadInfo<'_>, execution_result: &CommandExecutionResult, dep_file_entry: Option, + action_blobs: &ActionBlobs, ) -> anyhow::Result; } @@ -56,6 +58,7 @@ impl UploadCache for NoOpCacheUploader { _info: &CacheUploadInfo<'_>, _execution_result: &CommandExecutionResult, _dep_file_entry: Option, + _action_blobs: &ActionBlobs, ) -> anyhow::Result { Ok(CacheUploadResult { did_cache_upload: false, diff --git a/app/buck2_execute/src/execute/command_executor.rs b/app/buck2_execute/src/execute/command_executor.rs index facc822030e9..8e9756b4b243 100644 --- a/app/buck2_execute/src/execute/command_executor.rs +++ b/app/buck2_execute/src/execute/command_executor.rs @@ -123,10 +123,11 @@ impl CommandExecutor { info: &CacheUploadInfo<'_>, execution_result: &CommandExecutionResult, dep_file_entry: Option, + action_blobs: &ActionBlobs, ) -> anyhow::Result { self.0 .cache_uploader - .upload(info, execution_result, dep_file_entry) + .upload(info, execution_result, dep_file_entry, action_blobs) .await } diff --git a/app/buck2_execute_impl/src/executors/caching.rs b/app/buck2_execute_impl/src/executors/caching.rs index 06a30b9aba56..cacf93382b16 100644 --- a/app/buck2_execute_impl/src/executors/caching.rs +++ b/app/buck2_execute_impl/src/executors/caching.rs @@ -53,6 +53,7 @@ use remote_execution::TExecutedActionMetadata; use remote_execution::TFile; use remote_execution::TStatus; use remote_execution::TTimestamp; +use RE::InlinedBlobWithDigest; // Whether to throw errors when cache uploads fail (primarily for tests). static ERROR_ON_CACHE_UPLOAD: EnvHelper = EnvHelper::new("BUCK2_TEST_ERROR_ON_CACHE_UPLOAD"); @@ -95,6 +96,7 @@ impl CacheUploader { result: &CommandExecutionResult, digest_config: DigestConfig, error_on_cache_upload: bool, + action_blobs: &ActionBlobs, ) -> anyhow::Result { tracing::debug!("Uploading action result for `{}`", action_digest); let result = self @@ -105,6 +107,7 @@ impl CacheUploader { action_digest.to_re(), vec![], buck2_data::CacheUploadReason::LocalExecution, + action_blobs, ) .await; Self::modify_upload_result(action_digest, result, error_on_cache_upload) @@ -121,6 +124,7 @@ impl CacheUploader { digest_config: DigestConfig, dep_file_entry: DepFileEntry, error_on_cache_upload: bool, + action_blobs: &ActionBlobs, ) -> anyhow::Result { tracing::debug!( "Uploading dep file entry for action `{}` with dep file key `{}`", @@ -141,6 +145,7 @@ impl CacheUploader { digest_re, vec![dep_file_tany], buck2_data::CacheUploadReason::DepFile, + action_blobs, ) .await; @@ -159,6 +164,7 @@ impl CacheUploader { digest: TDigest, metadata: Vec, reason: buck2_data::CacheUploadReason, + action_blobs: &ActionBlobs, ) -> anyhow::Result { let digest_str = digest.to_string(); let output_bytes = result.calc_output_size_bytes(); @@ -183,6 +189,28 @@ impl CacheUploader { } } + // upload Action to CAS. + // This is necessary when writing to the ActionCache through CAS, since CAS needs to inspect the Action related to the ActionResult. + // Without storing the Action itself to CAS, ActionCache writes would fail. + let inlined_blobs = action_blobs + .iter() + .map(|(digest, blob)| InlinedBlobWithDigest { + digest: digest.to_re(), + blob: blob.0.to_vec(), + ..Default::default() + }) + .collect(); + + self.re_client + .upload_files_and_directories( + vec![], + vec![], + inlined_blobs, + self.re_use_case, + ) + .await?; + + // upload ActionResult to ActionCache let result: TActionResult2 = match self .upload_files_and_directories( result, @@ -428,6 +456,7 @@ impl UploadCache for CacheUploader { info: &CacheUploadInfo<'_>, res: &CommandExecutionResult, dep_file_entry: Option, + action_blobs: &ActionBlobs, ) -> anyhow::Result { let error_on_cache_upload = match ERROR_ON_CACHE_UPLOAD.get_copied() { Ok(r) => r.unwrap_or_default(), @@ -443,6 +472,7 @@ impl UploadCache for CacheUploader { res, info.digest_config, error_on_cache_upload, + action_blobs, ) .await? } else { @@ -460,6 +490,7 @@ impl UploadCache for CacheUploader { info.digest_config, dep_file_entry, error_on_cache_upload, + action_blobs, ) .await? }