Skip to content

Commit

Permalink
Scheduled jobs should be in a queue, not a stack.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisstaite-menlo authored and allada committed Jul 2, 2023
1 parent b320de5 commit 497446b
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 4 deletions.
1 change: 1 addition & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ rust_test(
srcs = ["tests/action_messages_test.rs"],
deps = [
":action_messages",
":platform_property_manager",
"//proto",
"//util:common",
"//util:error",
Expand Down
5 changes: 3 additions & 2 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ impl Eq for ActionInfo {}

impl Ord for ActionInfo {
fn cmp(&self, other: &Self) -> Ordering {
// Want the highest priority on top, but the lowest insert_timestamp.
self.priority
.cmp(&other.priority)
.then_with(|| self.insert_timestamp.cmp(&other.insert_timestamp))
.then_with(|| other.insert_timestamp.cmp(&self.insert_timestamp))
.then_with(|| self.salt().cmp(&other.salt()))
.then_with(|| self.digest().size_bytes.cmp(&other.digest().size_bytes))
.then_with(|| self.digest().packed_hash.cmp(&other.digest().packed_hash))
Expand All @@ -201,7 +202,7 @@ impl PartialOrd for ActionInfo {
let cmp = self
.priority
.cmp(&other.priority)
.then_with(|| self.insert_timestamp.cmp(&other.insert_timestamp))
.then_with(|| other.insert_timestamp.cmp(&self.insert_timestamp))
.then_with(|| self.salt().cmp(&other.salt()));
if cmp == Ordering::Equal {
return None;
Expand Down
111 changes: 109 additions & 2 deletions cas/scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::SystemTime;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use action_messages::{ActionResult, ActionStage, ActionState, ExecutionMetadata};
use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata};
use common::DigestInfo;
use error::Error;
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::ExecuteResponse;
use proto::google::longrunning::{operation, Operation};
use proto::google::rpc::Status;

const NOW_TIME: u64 = 10000;

fn make_system_time(add_time: u64) -> SystemTime {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(NOW_TIME + add_time))
.unwrap()
}

#[cfg(test)]
mod action_messages_tests {
use super::*;
Expand Down Expand Up @@ -77,4 +88,100 @@ mod action_messages_tests {

Ok(())
}

#[tokio::test]
async fn highest_priority_action_first() -> Result<(), Error> {
const INSTANCE_NAME: &str = "foobar_instance_name";

let high_priority_action = Arc::new(ActionInfo {
instance_name: INSTANCE_NAME.to_string(),
command_digest: DigestInfo::new([0u8; 32], 0),
input_root_digest: DigestInfo::new([0u8; 32], 0),
timeout: Duration::MAX,
platform_properties: PlatformProperties {
properties: HashMap::new(),
},
priority: 1000,
load_timestamp: SystemTime::UNIX_EPOCH,
insert_timestamp: SystemTime::UNIX_EPOCH,
unique_qualifier: ActionInfoHashKey {
digest: DigestInfo::new([0u8; 32], 0),
salt: 0,
},
});
let lowest_priority_action = Arc::new(ActionInfo {
instance_name: INSTANCE_NAME.to_string(),
command_digest: DigestInfo::new([0u8; 32], 0),
input_root_digest: DigestInfo::new([0u8; 32], 0),
timeout: Duration::MAX,
platform_properties: PlatformProperties {
properties: HashMap::new(),
},
priority: 0,
load_timestamp: SystemTime::UNIX_EPOCH,
insert_timestamp: SystemTime::UNIX_EPOCH,
unique_qualifier: ActionInfoHashKey {
digest: DigestInfo::new([1u8; 32], 0),
salt: 0,
},
});
let mut action_map = BTreeMap::<Arc<ActionInfo>, ()>::new();
action_map.insert(lowest_priority_action.clone(), ());
action_map.insert(high_priority_action.clone(), ());

assert_eq!(
vec![high_priority_action, lowest_priority_action],
action_map.keys().rev().cloned().collect::<Vec<Arc<ActionInfo>>>()
);

Ok(())
}

#[tokio::test]
async fn equal_priority_earliest_first() -> Result<(), Error> {
const INSTANCE_NAME: &str = "foobar_instance_name";

let first_action = Arc::new(ActionInfo {
instance_name: INSTANCE_NAME.to_string(),
command_digest: DigestInfo::new([0u8; 32], 0),
input_root_digest: DigestInfo::new([0u8; 32], 0),
timeout: Duration::MAX,
platform_properties: PlatformProperties {
properties: HashMap::new(),
},
priority: 0,
load_timestamp: SystemTime::UNIX_EPOCH,
insert_timestamp: SystemTime::UNIX_EPOCH,
unique_qualifier: ActionInfoHashKey {
digest: DigestInfo::new([0u8; 32], 0),
salt: 0,
},
});
let current_action = Arc::new(ActionInfo {
instance_name: INSTANCE_NAME.to_string(),
command_digest: DigestInfo::new([0u8; 32], 0),
input_root_digest: DigestInfo::new([0u8; 32], 0),
timeout: Duration::MAX,
platform_properties: PlatformProperties {
properties: HashMap::new(),
},
priority: 0,
load_timestamp: SystemTime::UNIX_EPOCH,
insert_timestamp: make_system_time(0),
unique_qualifier: ActionInfoHashKey {
digest: DigestInfo::new([1u8; 32], 0),
salt: 0,
},
});
let mut action_map = BTreeMap::<Arc<ActionInfo>, ()>::new();
action_map.insert(current_action.clone(), ());
action_map.insert(first_action.clone(), ());

assert_eq!(
vec![first_action, current_action],
action_map.keys().rev().cloned().collect::<Vec<Arc<ActionInfo>>>()
);

Ok(())
}
}
49 changes: 49 additions & 0 deletions cas/scheduler/tests/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,55 @@ mod scheduler_tests {
Ok(())
}

/// This tests that actions are performed in the order they were queued.
#[tokio::test]
async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x123456789111);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let action_digest1 = DigestInfo::new([11u8; 32], 512);
let action_digest2 = DigestInfo::new([99u8; 32], 512);

// Use property to restrict the worker to a single action at a time.
let mut properties = HashMap::new();
properties.insert("prop1".to_string(), PlatformPropertyValue::Minimum(1));
let platform_properties = PlatformProperties { properties };
// This is queued after the next one (even though it's placed in the map
// first), so it should execute second.
let insert_timestamp2 = make_system_time(2);
let mut client2_rx = setup_action(
&scheduler,
action_digest2.clone(),
platform_properties.clone(),
insert_timestamp2,
)
.await?;
let insert_timestamp1 = make_system_time(1);
let mut client1_rx = setup_action(
&scheduler,
action_digest1.clone(),
platform_properties.clone(),
insert_timestamp1,
)
.await?;

// Add the worker after the queue has been set up.
let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, platform_properties).await?;

match rx_from_worker.recv().await.unwrap().update {
Some(update_for_worker::Update::StartAction(_)) => { /* Success */ }
v => assert!(false, "Expected StartAction, got : {:?}", v),
}
{
// First client should be in an Executing state.
assert_eq!(client1_rx.borrow_and_update().stage, ActionStage::Executing);
// Second client should be in a queued state.
assert_eq!(client2_rx.borrow_and_update().stage, ActionStage::Queued);
}

Ok(())
}

#[tokio::test]
async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x123456789111);
Expand Down

0 comments on commit 497446b

Please sign in to comment.