Skip to content
This repository was archived by the owner on Nov 1, 2023. It is now read-only.

Commit 4dbc9c0

Browse files
authored
Reduce mutation in the agent state machine (#2710)
* reduce unnecessary mutation in the agent * cleanup and fix test * cleanup * reduce log some levels * desambiguate self call in the coordinator
1 parent 3a5101b commit 4dbc9c0

File tree

17 files changed

+235
-149
lines changed

17 files changed

+235
-149
lines changed

src/agent/onefuzz-agent/src/agent.rs

Lines changed: 83 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::reboot::*;
1212
use crate::scheduler::*;
1313
use crate::setup::*;
1414
use crate::work::IWorkQueue;
15-
use crate::worker::IWorkerRunner;
15+
use crate::worker::{IWorkerRunner, WorkerEvent};
1616

1717
const PENDING_COMMANDS_DELAY: time::Duration = time::Duration::from_secs(10);
1818
const BUSY_DELAY: time::Duration = time::Duration::from_secs(1);
@@ -62,7 +62,7 @@ impl Agent {
6262
}
6363
}
6464

65-
pub async fn run(&mut self) -> Result<()> {
65+
pub async fn run(self) -> Result<()> {
6666
let mut instant = time::Instant::now();
6767

6868
// Tell the service that the agent has started.
@@ -78,42 +78,39 @@ impl Agent {
7878
let event = StateUpdateEvent::Init.into();
7979
self.coordinator.emit_event(event).await?;
8080
}
81-
82-
loop {
83-
self.heartbeat.alive();
81+
let mut state = self;
82+
let mut done = false;
83+
while !done {
84+
state.heartbeat.alive();
8485
if instant.elapsed() >= PENDING_COMMANDS_DELAY {
85-
self.execute_pending_commands().await?;
86+
state = state.execute_pending_commands().await?;
8687
instant = time::Instant::now();
8788
}
8889

89-
let done = self.update().await?;
90-
91-
if done {
92-
debug!("agent done, exiting loop");
93-
break;
94-
}
90+
(state, done) = state.update().await?;
9591
}
9692

93+
info!("agent done, exiting loop");
9794
Ok(())
9895
}
9996

100-
async fn update(&mut self) -> Result<bool> {
97+
async fn update(mut self) -> Result<(Self, bool)> {
10198
let last = self.scheduler.take().ok_or_else(scheduler_error)?;
10299
let previous_state = NodeState::from(&last);
103100
let (next, done) = match last {
104-
Scheduler::Free(s) => (self.free(s).await?, false),
105-
Scheduler::SettingUp(s) => (self.setting_up(s).await?, false),
106-
Scheduler::PendingReboot(s) => (self.pending_reboot(s).await?, false),
107-
Scheduler::Ready(s) => (self.ready(s).await?, false),
108-
Scheduler::Busy(s) => (self.busy(s).await?, false),
109-
Scheduler::Done(s) => (self.done(s).await?, true),
101+
Scheduler::Free(s) => (self.free(s, previous_state).await?, false),
102+
Scheduler::SettingUp(s) => (self.setting_up(s, previous_state).await?, false),
103+
Scheduler::PendingReboot(s) => (self.pending_reboot(s, previous_state).await?, false),
104+
Scheduler::Ready(s) => (self.ready(s, previous_state).await?, false),
105+
Scheduler::Busy(s) => (self.busy(s, previous_state).await?, false),
106+
//todo: introduce a new prameter to allow the agent to restart after this point
107+
Scheduler::Done(s) => (self.done(s, previous_state).await?, true),
110108
};
111-
self.previous_state = previous_state;
112-
self.scheduler = Some(next);
113-
Ok(done)
109+
110+
Ok((next, done))
114111
}
115112

116-
async fn emit_state_update_if_changed(&mut self, event: StateUpdateEvent) -> Result<()> {
113+
async fn emit_state_update_if_changed(&self, event: StateUpdateEvent) -> Result<()> {
117114
match (&event, self.previous_state) {
118115
(StateUpdateEvent::Free, NodeState::Free)
119116
| (StateUpdateEvent::Busy, NodeState::Busy)
@@ -129,7 +126,7 @@ impl Agent {
129126
Ok(())
130127
}
131128

132-
async fn free(&mut self, state: State<Free>) -> Result<Scheduler> {
129+
async fn free(mut self, state: State<Free>, previous: NodeState) -> Result<Self> {
133130
self.emit_state_update_if_changed(StateUpdateEvent::Free)
134131
.await?;
135132

@@ -190,7 +187,7 @@ impl Agent {
190187
// Otherwise, the work was not stopped, but we still should not execute it. This is likely
191188
// our because agent version is out of date. Do nothing, so another node can see the work.
192189
// The service will eventually send us a stop command and reimage our node, if appropriate.
193-
debug!(
190+
info!(
194191
"not scheduling active work set, not dropping: {:?}",
195192
msg.work_set
196193
);
@@ -205,11 +202,15 @@ impl Agent {
205202
state.into()
206203
};
207204

208-
Ok(next)
205+
Ok(Self {
206+
previous_state: previous,
207+
scheduler: Some(next),
208+
..self
209+
})
209210
}
210211

211-
async fn setting_up(&mut self, state: State<SettingUp>) -> Result<Scheduler> {
212-
debug!("agent setting up");
212+
async fn setting_up(mut self, state: State<SettingUp>, previous: NodeState) -> Result<Self> {
213+
info!("agent setting up");
213214

214215
let tasks = state.work_set().task_ids();
215216
self.emit_state_update_if_changed(StateUpdateEvent::SettingUp { tasks })
@@ -221,11 +222,19 @@ impl Agent {
221222
SetupDone::Done(s) => s.into(),
222223
};
223224

224-
Ok(scheduler)
225+
Ok(Self {
226+
previous_state: previous,
227+
scheduler: Some(scheduler),
228+
..self
229+
})
225230
}
226231

227-
async fn pending_reboot(&mut self, state: State<PendingReboot>) -> Result<Scheduler> {
228-
debug!("agent pending reboot");
232+
async fn pending_reboot(
233+
self,
234+
state: State<PendingReboot>,
235+
_previous: NodeState,
236+
) -> Result<Self> {
237+
info!("agent pending reboot");
229238
self.emit_state_update_if_changed(StateUpdateEvent::Rebooting)
230239
.await?;
231240

@@ -236,14 +245,18 @@ impl Agent {
236245
unreachable!()
237246
}
238247

239-
async fn ready(&mut self, state: State<Ready>) -> Result<Scheduler> {
240-
debug!("agent ready");
248+
async fn ready(self, state: State<Ready>, previous: NodeState) -> Result<Self> {
249+
info!("agent ready");
241250
self.emit_state_update_if_changed(StateUpdateEvent::Ready)
242251
.await?;
243-
Ok(state.run().await?.into())
252+
Ok(Self {
253+
previous_state: previous,
254+
scheduler: Some(state.run().await?.into()),
255+
..self
256+
})
244257
}
245258

246-
async fn busy(&mut self, state: State<Busy>) -> Result<Scheduler> {
259+
async fn busy(mut self, state: State<Busy>, previous: NodeState) -> Result<Self> {
247260
self.emit_state_update_if_changed(StateUpdateEvent::Busy)
248261
.await?;
249262

@@ -255,7 +268,7 @@ impl Agent {
255268
// that is done, this sleep should be removed.
256269
time::sleep(BUSY_DELAY).await;
257270

258-
let mut events = vec![];
271+
let mut events: Vec<WorkerEvent> = vec![];
259272
let updated = state
260273
.update(&mut events, self.worker_runner.as_mut())
261274
.await?;
@@ -264,11 +277,15 @@ impl Agent {
264277
self.coordinator.emit_event(event.into()).await?;
265278
}
266279

267-
Ok(updated.into())
280+
Ok(Self {
281+
previous_state: previous,
282+
scheduler: Some(updated.into()),
283+
..self
284+
})
268285
}
269286

270-
async fn done(&mut self, state: State<Done>) -> Result<Scheduler> {
271-
debug!("agent done");
287+
async fn done(self, state: State<Done>, previous: NodeState) -> Result<Self> {
288+
info!("agent done");
272289
set_done_lock(self.machine_id).await?;
273290

274291
let event = match state.cause() {
@@ -287,23 +304,41 @@ impl Agent {
287304

288305
self.emit_state_update_if_changed(event).await?;
289306
// `Done` is a final state.
290-
Ok(state.into())
307+
Ok(Self {
308+
previous_state: previous,
309+
scheduler: Some(state.into()),
310+
..self
311+
})
291312
}
292313

293-
async fn execute_pending_commands(&mut self) -> Result<()> {
314+
async fn execute_pending_commands(mut self) -> Result<Self> {
294315
let result = self.coordinator.poll_commands().await;
295316

296317
match &result {
297-
Ok(None) => {}
318+
Ok(None) => Ok(Self {
319+
last_poll_command: result,
320+
..self
321+
}),
298322
Ok(Some(cmd)) => {
299323
info!("agent received node command: {:?}", cmd);
300324
let managed = self.managed;
301-
self.scheduler()?.execute_command(cmd, managed).await?;
325+
let scheduler = self.scheduler.take().ok_or_else(scheduler_error)?;
326+
let new_scheduler = scheduler.execute_command(cmd.clone(), managed).await?;
327+
328+
Ok(Self {
329+
last_poll_command: result,
330+
scheduler: Some(new_scheduler),
331+
..self
332+
})
302333
}
303334
Err(PollCommandError::RequestFailed(err)) => {
304335
// If we failed to request commands, this could be the service
305336
// could be down. Log it, but keep going.
306337
error!("error polling the service for commands: {:?}", err);
338+
Ok(Self {
339+
last_poll_command: result,
340+
..self
341+
})
307342
}
308343
Err(PollCommandError::RequestParseFailed(err)) => {
309344
bail!("poll commands failed: {:?}", err);
@@ -321,22 +356,18 @@ impl Agent {
321356
bail!("repeated command claim attempt failures: {:?}", err);
322357
}
323358
error!("error claiming command from the service: {:?}", err);
359+
Ok(Self {
360+
last_poll_command: result,
361+
..self
362+
})
324363
}
325364
}
326-
327-
self.last_poll_command = result;
328-
329-
Ok(())
330365
}
331366

332-
async fn sleep(&mut self) {
367+
async fn sleep(&self) {
333368
let delay = time::Duration::from_secs(30);
334369
time::sleep(delay).await;
335370
}
336-
337-
fn scheduler(&mut self) -> Result<&mut Scheduler> {
338-
self.scheduler.as_mut().ok_or_else(scheduler_error)
339-
}
340371
}
341372

342373
// The agent owns a `Scheduler`, which it must consume when driving its state

src/agent/onefuzz-agent/src/agent/tests.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ impl Fixture {
8383

8484
#[tokio::test]
8585
async fn test_update_free_no_work() {
86-
let mut agent = Fixture.agent();
86+
let agent = Fixture.agent();
8787

88-
let done = agent.update().await.unwrap();
88+
let (agent, done) = agent.update().await.unwrap();
8989
assert!(!done);
9090

91-
assert!(matches!(agent.scheduler().unwrap(), Scheduler::Free(..)));
91+
assert!(matches!(agent.scheduler.unwrap(), Scheduler::Free(..)));
9292

9393
let double: &WorkQueueDouble = agent.work_queue.downcast_ref().unwrap();
9494
let claimed_worksets = double
@@ -109,13 +109,9 @@ async fn test_update_free_has_work() {
109109
.available
110110
.push(Fixture.message());
111111

112-
let done = agent.update().await.unwrap();
112+
let (agent, done) = agent.update().await.unwrap();
113113
assert!(!done);
114-
115-
assert!(matches!(
116-
agent.scheduler().unwrap(),
117-
Scheduler::SettingUp(..)
118-
));
114+
assert!(matches!(agent.scheduler.unwrap(), Scheduler::SettingUp(..)));
119115

120116
let double: &WorkQueueDouble = agent.work_queue.downcast_ref().unwrap();
121117
let claimed_worksets = double
@@ -149,8 +145,10 @@ async fn test_emitted_state() {
149145
.available
150146
.push(Fixture.message());
151147

148+
let mut done;
152149
for _i in 0..10 {
153-
if agent.update().await.unwrap() {
150+
(agent, done) = agent.update().await.unwrap();
151+
if done {
154152
break;
155153
}
156154
}
@@ -181,8 +179,8 @@ async fn test_emitted_state() {
181179
}),
182180
];
183181
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
184-
let events = &coordinator.events;
185-
assert_eq!(events, &expected_events);
182+
let events = &coordinator.events.read().await;
183+
assert_eq!(&events.to_vec(), &expected_events);
186184
}
187185

188186
#[tokio::test]
@@ -206,8 +204,10 @@ async fn test_emitted_state_failed_setup() {
206204
.available
207205
.push(Fixture.message());
208206

207+
let mut done;
209208
for _i in 0..10 {
210-
if agent.update().await.unwrap() {
209+
(agent, done) = agent.update().await.unwrap();
210+
if done {
211211
break;
212212
}
213213
}
@@ -223,7 +223,7 @@ async fn test_emitted_state_failed_setup() {
223223
}),
224224
];
225225
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
226-
let events = &coordinator.events;
226+
let events = &coordinator.events.read().await.to_vec();
227227
assert_eq!(events, &expected_events);
228228

229229
// TODO: at some point, the underlying tests should be updated to not write

src/agent/onefuzz-agent/src/commands.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const ONEFUZZ_SERVICE_USER: &str = "onefuzz";
2525
#[cfg(target_family = "windows")]
2626
static SET_PERMISSION_ONCE: OnceCell<()> = OnceCell::const_new();
2727

28-
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
28+
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize, Clone)]
2929
pub struct SshKeyInfo {
3030
pub public_key: Secret<String>,
3131
}

src/agent/onefuzz-agent/src/config.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,12 @@ impl Registration {
319319
pub async fn load_existing(config: StaticConfig) -> Result<Self> {
320320
let dynamic_config = DynamicConfig::load().await?;
321321
let machine_id = config.machine_identity.machine_id;
322-
let mut registration = Self {
322+
let registration = Self {
323323
config,
324324
dynamic_config,
325325
machine_id,
326326
};
327-
registration.renew().await?;
328-
Ok(registration)
327+
registration.renew().await
329328
}
330329

331330
pub async fn create_managed(config: StaticConfig) -> Result<Self> {
@@ -336,7 +335,7 @@ impl Registration {
336335
Self::create(config, false, DEFAULT_REGISTRATION_CREATE_TIMEOUT).await
337336
}
338337

339-
pub async fn renew(&mut self) -> Result<()> {
338+
pub async fn renew(&self) -> Result<Self> {
340339
info!("renewing registration");
341340
let token = self.config.credentials.access_token().await?;
342341

@@ -355,9 +354,13 @@ impl Registration {
355354
.await
356355
.context("Registration.renew request body")?;
357356

358-
self.dynamic_config = response.json().await?;
359-
self.dynamic_config.save().await?;
357+
let dynamic_config: DynamicConfig = response.json().await?;
358+
dynamic_config.save().await?;
360359

361-
Ok(())
360+
Ok(Self {
361+
dynamic_config,
362+
config: self.config.clone(),
363+
machine_id: self.machine_id,
364+
})
362365
}
363366
}

0 commit comments

Comments
 (0)