@@ -12,7 +12,7 @@ use crate::reboot::*;
1212use crate :: scheduler:: * ;
1313use crate :: setup:: * ;
1414use crate :: work:: IWorkQueue ;
15- use crate :: worker:: IWorkerRunner ;
15+ use crate :: worker:: { IWorkerRunner , WorkerEvent } ;
1616
1717const PENDING_COMMANDS_DELAY : time:: Duration = time:: Duration :: from_secs ( 10 ) ;
1818const BUSY_DELAY : time:: Duration = time:: Duration :: from_secs ( 1 ) ;
@@ -62,7 +62,7 @@ impl Agent {
6262 }
6363 }
6464
65- pub async fn run ( & mut self ) -> Result < ( ) > {
65+ pub async fn run ( self ) -> Result < ( ) > {
6666 let mut instant = time:: Instant :: now ( ) ;
6767
6868 // Tell the service that the agent has started.
@@ -78,42 +78,39 @@ impl Agent {
7878 let event = StateUpdateEvent :: Init . into ( ) ;
7979 self . coordinator . emit_event ( event) . await ?;
8080 }
81-
82- loop {
83- self . heartbeat . alive ( ) ;
81+ let mut state = self ;
82+ let mut done = false ;
83+ while !done {
84+ state. heartbeat . alive ( ) ;
8485 if instant. elapsed ( ) >= PENDING_COMMANDS_DELAY {
85- self . execute_pending_commands ( ) . await ?;
86+ state = state . execute_pending_commands ( ) . await ?;
8687 instant = time:: Instant :: now ( ) ;
8788 }
8889
89- let done = self . update ( ) . await ?;
90-
91- if done {
92- debug ! ( "agent done, exiting loop" ) ;
93- break ;
94- }
90+ ( state, done) = state. update ( ) . await ?;
9591 }
9692
93+ info ! ( "agent done, exiting loop" ) ;
9794 Ok ( ( ) )
9895 }
9996
100- async fn update ( & mut self ) -> Result < bool > {
97+ async fn update ( mut self ) -> Result < ( Self , bool ) > {
10198 let last = self . scheduler . take ( ) . ok_or_else ( scheduler_error) ?;
10299 let previous_state = NodeState :: from ( & last) ;
103100 let ( next, done) = match last {
104- Scheduler :: Free ( s) => ( self . free ( s) . await ?, false ) ,
105- Scheduler :: SettingUp ( s) => ( self . setting_up ( s) . await ?, false ) ,
106- Scheduler :: PendingReboot ( s) => ( self . pending_reboot ( s) . await ?, false ) ,
107- Scheduler :: Ready ( s) => ( self . ready ( s) . await ?, false ) ,
108- Scheduler :: Busy ( s) => ( self . busy ( s) . await ?, false ) ,
109- Scheduler :: Done ( s) => ( self . done ( s) . await ?, true ) ,
101+ Scheduler :: Free ( s) => ( self . free ( s, previous_state) . await ?, false ) ,
102+ Scheduler :: SettingUp ( s) => ( self . setting_up ( s, previous_state) . await ?, false ) ,
103+ Scheduler :: PendingReboot ( s) => ( self . pending_reboot ( s, previous_state) . await ?, false ) ,
104+ Scheduler :: Ready ( s) => ( self . ready ( s, previous_state) . await ?, false ) ,
105+ Scheduler :: Busy ( s) => ( self . busy ( s, previous_state) . await ?, false ) ,
106+ //todo: introduce a new prameter to allow the agent to restart after this point
107+ Scheduler :: Done ( s) => ( self . done ( s, previous_state) . await ?, true ) ,
110108 } ;
111- self . previous_state = previous_state;
112- self . scheduler = Some ( next) ;
113- Ok ( done)
109+
110+ Ok ( ( next, done) )
114111 }
115112
116- async fn emit_state_update_if_changed ( & mut self , event : StateUpdateEvent ) -> Result < ( ) > {
113+ async fn emit_state_update_if_changed ( & self , event : StateUpdateEvent ) -> Result < ( ) > {
117114 match ( & event, self . previous_state ) {
118115 ( StateUpdateEvent :: Free , NodeState :: Free )
119116 | ( StateUpdateEvent :: Busy , NodeState :: Busy )
@@ -129,7 +126,7 @@ impl Agent {
129126 Ok ( ( ) )
130127 }
131128
132- async fn free ( & mut self , state : State < Free > ) -> Result < Scheduler > {
129+ async fn free ( mut self , state : State < Free > , previous : NodeState ) -> Result < Self > {
133130 self . emit_state_update_if_changed ( StateUpdateEvent :: Free )
134131 . await ?;
135132
@@ -190,7 +187,7 @@ impl Agent {
190187 // Otherwise, the work was not stopped, but we still should not execute it. This is likely
191188 // our because agent version is out of date. Do nothing, so another node can see the work.
192189 // The service will eventually send us a stop command and reimage our node, if appropriate.
193- debug ! (
190+ info ! (
194191 "not scheduling active work set, not dropping: {:?}" ,
195192 msg. work_set
196193 ) ;
@@ -205,11 +202,15 @@ impl Agent {
205202 state. into ( )
206203 } ;
207204
208- Ok ( next)
205+ Ok ( Self {
206+ previous_state : previous,
207+ scheduler : Some ( next) ,
208+ ..self
209+ } )
209210 }
210211
211- async fn setting_up ( & mut self , state : State < SettingUp > ) -> Result < Scheduler > {
212- debug ! ( "agent setting up" ) ;
212+ async fn setting_up ( mut self , state : State < SettingUp > , previous : NodeState ) -> Result < Self > {
213+ info ! ( "agent setting up" ) ;
213214
214215 let tasks = state. work_set ( ) . task_ids ( ) ;
215216 self . emit_state_update_if_changed ( StateUpdateEvent :: SettingUp { tasks } )
@@ -221,11 +222,19 @@ impl Agent {
221222 SetupDone :: Done ( s) => s. into ( ) ,
222223 } ;
223224
224- Ok ( scheduler)
225+ Ok ( Self {
226+ previous_state : previous,
227+ scheduler : Some ( scheduler) ,
228+ ..self
229+ } )
225230 }
226231
227- async fn pending_reboot ( & mut self , state : State < PendingReboot > ) -> Result < Scheduler > {
228- debug ! ( "agent pending reboot" ) ;
232+ async fn pending_reboot (
233+ self ,
234+ state : State < PendingReboot > ,
235+ _previous : NodeState ,
236+ ) -> Result < Self > {
237+ info ! ( "agent pending reboot" ) ;
229238 self . emit_state_update_if_changed ( StateUpdateEvent :: Rebooting )
230239 . await ?;
231240
@@ -236,14 +245,18 @@ impl Agent {
236245 unreachable ! ( )
237246 }
238247
239- async fn ready ( & mut self , state : State < Ready > ) -> Result < Scheduler > {
240- debug ! ( "agent ready" ) ;
248+ async fn ready ( self , state : State < Ready > , previous : NodeState ) -> Result < Self > {
249+ info ! ( "agent ready" ) ;
241250 self . emit_state_update_if_changed ( StateUpdateEvent :: Ready )
242251 . await ?;
243- Ok ( state. run ( ) . await ?. into ( ) )
252+ Ok ( Self {
253+ previous_state : previous,
254+ scheduler : Some ( state. run ( ) . await ?. into ( ) ) ,
255+ ..self
256+ } )
244257 }
245258
246- async fn busy ( & mut self , state : State < Busy > ) -> Result < Scheduler > {
259+ async fn busy ( mut self , state : State < Busy > , previous : NodeState ) -> Result < Self > {
247260 self . emit_state_update_if_changed ( StateUpdateEvent :: Busy )
248261 . await ?;
249262
@@ -255,7 +268,7 @@ impl Agent {
255268 // that is done, this sleep should be removed.
256269 time:: sleep ( BUSY_DELAY ) . await ;
257270
258- let mut events = vec ! [ ] ;
271+ let mut events: Vec < WorkerEvent > = vec ! [ ] ;
259272 let updated = state
260273 . update ( & mut events, self . worker_runner . as_mut ( ) )
261274 . await ?;
@@ -264,11 +277,15 @@ impl Agent {
264277 self . coordinator . emit_event ( event. into ( ) ) . await ?;
265278 }
266279
267- Ok ( updated. into ( ) )
280+ Ok ( Self {
281+ previous_state : previous,
282+ scheduler : Some ( updated. into ( ) ) ,
283+ ..self
284+ } )
268285 }
269286
270- async fn done ( & mut self , state : State < Done > ) -> Result < Scheduler > {
271- debug ! ( "agent done" ) ;
287+ async fn done ( self , state : State < Done > , previous : NodeState ) -> Result < Self > {
288+ info ! ( "agent done" ) ;
272289 set_done_lock ( self . machine_id ) . await ?;
273290
274291 let event = match state. cause ( ) {
@@ -287,23 +304,41 @@ impl Agent {
287304
288305 self . emit_state_update_if_changed ( event) . await ?;
289306 // `Done` is a final state.
290- Ok ( state. into ( ) )
307+ Ok ( Self {
308+ previous_state : previous,
309+ scheduler : Some ( state. into ( ) ) ,
310+ ..self
311+ } )
291312 }
292313
293- async fn execute_pending_commands ( & mut self ) -> Result < ( ) > {
314+ async fn execute_pending_commands ( mut self ) -> Result < Self > {
294315 let result = self . coordinator . poll_commands ( ) . await ;
295316
296317 match & result {
297- Ok ( None ) => { }
318+ Ok ( None ) => Ok ( Self {
319+ last_poll_command : result,
320+ ..self
321+ } ) ,
298322 Ok ( Some ( cmd) ) => {
299323 info ! ( "agent received node command: {:?}" , cmd) ;
300324 let managed = self . managed ;
301- self . scheduler ( ) ?. execute_command ( cmd, managed) . await ?;
325+ let scheduler = self . scheduler . take ( ) . ok_or_else ( scheduler_error) ?;
326+ let new_scheduler = scheduler. execute_command ( cmd. clone ( ) , managed) . await ?;
327+
328+ Ok ( Self {
329+ last_poll_command : result,
330+ scheduler : Some ( new_scheduler) ,
331+ ..self
332+ } )
302333 }
303334 Err ( PollCommandError :: RequestFailed ( err) ) => {
304335 // If we failed to request commands, this could be the service
305336 // could be down. Log it, but keep going.
306337 error ! ( "error polling the service for commands: {:?}" , err) ;
338+ Ok ( Self {
339+ last_poll_command : result,
340+ ..self
341+ } )
307342 }
308343 Err ( PollCommandError :: RequestParseFailed ( err) ) => {
309344 bail ! ( "poll commands failed: {:?}" , err) ;
@@ -321,22 +356,18 @@ impl Agent {
321356 bail ! ( "repeated command claim attempt failures: {:?}" , err) ;
322357 }
323358 error ! ( "error claiming command from the service: {:?}" , err) ;
359+ Ok ( Self {
360+ last_poll_command : result,
361+ ..self
362+ } )
324363 }
325364 }
326-
327- self . last_poll_command = result;
328-
329- Ok ( ( ) )
330365 }
331366
332- async fn sleep ( & mut self ) {
367+ async fn sleep ( & self ) {
333368 let delay = time:: Duration :: from_secs ( 30 ) ;
334369 time:: sleep ( delay) . await ;
335370 }
336-
337- fn scheduler ( & mut self ) -> Result < & mut Scheduler > {
338- self . scheduler . as_mut ( ) . ok_or_else ( scheduler_error)
339- }
340371}
341372
342373// The agent owns a `Scheduler`, which it must consume when driving its state
0 commit comments