Skip to content

Commit

Permalink
Fix a possible infinite loop in RedisStore::update (#1269)
Browse files Browse the repository at this point in the history
Previously, there were certain conditions that would lead to
`RedisStore::update` looping forever, causing the program to
hang -- see https://reviewable.io/reviews/TraceMachina/nativelink/1188#-O2pu9LV5ux4ILuT6MND
  • Loading branch information
caass authored Sep 17, 2024
1 parent 0ccaa15 commit 8d957a5
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 105 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
bazel-*
target/
.vscode/
.zed
.cache
.terraform*
.config
Expand Down
100 changes: 71 additions & 29 deletions nativelink-store/src/redis_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,48 +298,90 @@ impl StoreDriver for RedisStore {
// As a result of this, there will be a span of time where a key in Redis has only partial data. We want other
// observers to notice atomic updates to keys, rather than partial updates, so we first write to a temporary key
// and then rename that key once we're done appending data.
//
// TODO(caass): Remove potential for infinite loop (https://reviewable.io/reviews/TraceMachina/nativelink/1188#-O2pu9LV5ux4ILuT6MND)
'outer: loop {
let mut expecting_first_chunk = true;
let pipe = client.pipeline();

while expecting_first_chunk || !reader.is_empty() {
let chunk = reader
.recv()
.await
.err_tip(|| "Failed to reach chunk in update in redis store")?;
let mut is_first_chunk = true;
let mut eof_reached = false;
let mut pipe = client.pipeline();

if chunk.is_empty() {
if is_zero_digest(key.borrow()) {
return Ok(());
}
while !eof_reached {
pipe = client.pipeline();
let mut pipe_size = 0;
const MAX_PIPE_SIZE: usize = 5 * 1024 * 1024; // 5 MB

// Reader sent empty chunk, we're done here.
break 'outer;
}
let chunk = reader
.recv()
.await
.err_tip(|| "Failed to reach chunk in update in redis store")?;

if chunk.is_empty() {
// There are three cases where we receive an empty chunk:
// 1. The first chunk of a zero-digest key. We're required to treat all zero-digest keys as if they exist
// and are empty per the RBE spec, so we can just return early as if we've pushed it -- any attempts to
// read the value later will similarly avoid the network trip.
// 2. This is an empty first chunk of a non-zero-digest key. In this case, we _do_ need to push up an
// empty key, but can skip the rest of the process around renaming since there's only the one operation.
// 3. This is the last chunk (EOF) of a regular key. In that case we can skip pushing this chunk.
//
// In all three cases, we're done pushing data and can move it from the temporary key to the final key.
if is_first_chunk && is_zero_digest(key.borrow()) {
// Case 1, a zero-digest key.
return Ok(());
} else if is_first_chunk {
// Case 2, an empty non-zero-digest key.
pipe.append::<(), _, _>(&temp_key, "")
.await
.err_tip(|| "While appending to temp key in RedisStore::update")?;
};

// Queue the append, but don't execute until we've received all the chunks.
pipe.append::<(), _, _>(&temp_key, chunk)
.await
.err_tip(|| "Failed to append to temp key in RedisStore::update")?;
expecting_first_chunk = false;
// Note: setting `eof_reached = true` and calling `continue` is semantically equivalent to `break`.
// Since we need to use the `eof_reached` flag in the inner loop, we do the same here
// for consistency.
eof_reached = true;
continue;
} else {
// Not EOF, but we've now received our first chunk.
is_first_chunk = false;
}

// Give other tasks a chance to run to populate the reader's
// buffer if possible.
tokio::task::yield_now().await;
// Queue the append, but don't execute until we've received all the chunks.
pipe_size += chunk.len();
pipe.append::<(), _, _>(&temp_key, chunk)
.await
.err_tip(|| "Failed to append to temp key in RedisStore::update")?;

// Opportunistically grab any other chunks already in the reader.
while let Some(chunk) = reader
.try_recv()
.transpose()
.err_tip(|| "Failed to reach chunk in update in redis store")?
{
if chunk.is_empty() {
eof_reached = true;
break;
} else {
pipe_size += chunk.len();
pipe.append::<(), _, _>(&temp_key, chunk)
.await
.err_tip(|| "Failed to append to temp key in RedisStore::update")?;
}

// Stop appending if the pipeline is already holding 5MB of data.
if pipe_size >= MAX_PIPE_SIZE {
break;
}
}

// Here the reader is empty but more data is expected.
// We've exhausted the reader (or hit the 5MB cap), but more data is expected.
// Executing the queued commands appends the data we just received to the temp key.
pipe.all::<()>()
.await
.err_tip(|| "Failed to append to temporary key in RedisStore::update")?;
}

// Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
client
.rename::<(), _, _>(&temp_key, final_key.as_ref())
pipe.rename::<(), _, _>(&temp_key, final_key.as_ref())
.await
.err_tip(|| "While queueing key rename in RedisStore::update()")?;
pipe.all::<()>()
.await
.err_tip(|| "While renaming key in RedisStore::update()")?;

Expand Down
Loading

0 comments on commit 8d957a5

Please sign in to comment.