Skip to content

Commit fa0c227

Browse files
committed
fix: support resume weak executor
1 parent ef545a0 commit fa0c227

File tree

3 files changed

+129
-39
lines changed

3 files changed

+129
-39
lines changed

src/executor.rs

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ use alloc::alloc::{Allocator, Global, Layout};
88
use bit_iter::BitIter;
99
use core::pin::Pin;
1010
use core::task::Waker;
11-
use trapframe::TrapFrame;
1211
use riscv::register::sstatus;
12+
use trapframe::TrapFrame;
13+
use core::matches;
1314
use {
1415
alloc::boxed::Box,
1516
alloc::sync::Arc,
@@ -21,12 +22,20 @@ use {
2122
use crate::executor_entry;
2223
use crate::runtime::PriorityInner;
2324

25+
enum ExecutorState {
26+
STRONG,
27+
WEAK, // 执行完一次future后就需要被drop
28+
KILLED,
29+
UNUSED,
30+
}
31+
2432
pub struct Executor<F: Future<Output = ()> + Unpin> {
2533
priority_inner: Arc<PriorityInner<F>>,
2634
stack_base: usize,
2735
pub context: ExecuterContext,
2836
pub trapfrmae: TrapFrame,
2937
is_running_future: bool,
38+
state: ExecutorState,
3039
}
3140

3241
const STACK_SIZE: usize = 4096 * 16;
@@ -44,15 +53,16 @@ impl<F: Future<Output = ()> + Unpin> Executor<F> {
4453
context: ExecuterContext::default(),
4554
trapfrmae: TrapFrame::default(),
4655
is_running_future: false,
56+
state: ExecutorState::UNUSED,
4757
}));
4858

4959
// 栈是从高地址向低地址生长的
5060
let stack_top = pin_executor.stack_base + STACK_SIZE;
5161
let executer_addr = pin_executor.as_ref().get_ref() as *const _ as usize;
5262

5363
// 发生trap时, 会将trapframe储存在stack中, 在trap_return时
54-
// 会loadtrapframe恢复trap前的context. 对于新创建的executor,
55-
// 需要手动在栈上保存trapframe(主要是sepc和sstatus寄存器),
64+
// 会loadtrapframe恢复trap前的context. 对于新创建的executor,
65+
// 需要手动在栈上保存trapframe(主要是sepc和sstatus寄存器),
5666
// 并将sp寄存器设置为改地址.
5767
pin_executor.context.sp = Self::generate_stack(executer_addr, stack_top);
5868

@@ -95,14 +105,24 @@ impl<F: Future<Output = ()> + Unpin> Executor<F> {
95105
let pinned_ref = unsafe { Pin::new_unchecked(&mut *pinned_ptr) };
96106
drop(inner); // 本次运行的coroutine可能会用到GLOBAL_EXCUTOR.inner(e.g. spawn())
97107

98-
unsafe { sstatus::set_sie(); } // poll future时允许中断
99-
log::warn!("enable interrupt future={}", idx);
108+
unsafe {
109+
sstatus::set_sie();
110+
} // poll future时允许中断
100111
self.is_running_future = true;
112+
101113
let ret = { Future::poll(pinned_ref, &mut cx) };
102-
log::warn!("disable interrupt");
103-
unsafe { sstatus::clear_sie(); } // 禁用中断
114+
115+
unsafe {
116+
sstatus::clear_sie();
117+
} // 禁用中断
104118
self.is_running_future = false;
105119

120+
if let ExecutorState::WEAK = self.state {
121+
log::info!("weak executor");
122+
self.state = ExecutorState::KILLED;
123+
return;
124+
}
125+
106126
inner = self.priority_inner.get_mut_inner(priority);
107127
match ret {
108128
Poll::Ready(()) => inner.pages[page_idx].mark_dropped(subpage_idx),
@@ -113,6 +133,8 @@ impl<F: Future<Output = ()> + Unpin> Executor<F> {
113133
}
114134
}
115135
if !found {
136+
log::warn!("yield");
137+
crate::runtime::yeild();
116138
unsafe {
117139
crate::wait_for_interrupt();
118140
}
@@ -122,7 +144,7 @@ impl<F: Future<Output = ()> + Unpin> Executor<F> {
122144

123145
// stack layout: [executor_addr|32个寄存器|sstatus|sepc]
124146
// 发生中断时trapframe保存在栈中
125-
pub fn generate_stack(executor_addr: usize, stack_top: usize) -> usize{
147+
pub fn generate_stack(executor_addr: usize, stack_top: usize) -> usize {
126148
// 将executor的地址放置在栈底, 以便于新的executor执行线程获取到
127149
// executor实例。
128150
let mut stack_top = stack_top;
@@ -147,6 +169,14 @@ impl<F: Future<Output = ()> + Unpin> Executor<F> {
147169
pub fn is_running_future(&self) -> bool {
148170
return self.is_running_future;
149171
}
172+
173+
pub fn killed(&self) -> bool {
174+
return matches!(self.state, ExecutorState::KILLED);
175+
}
176+
177+
pub fn mark_weak(&mut self) {
178+
self.state = ExecutorState::WEAK;
179+
}
150180
}
151181
unsafe impl Send for Executor<Task> {}
152182
unsafe impl Sync for Executor<Task> {}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ global_asm!(include_str!("executor_entry.S"));
99
extern "C" {
1010
pub(crate) fn wait_for_interrupt();
1111
pub(crate) fn executor_entry();
12-
pub(crate) fn switch(cx1: usize, cx2: usize);
12+
pub(crate) fn switch(save_cx: usize, load_cx: usize);
1313
pub(crate) fn trap_return();
1414
}
1515

src/runtime.rs

Lines changed: 90 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
extern crate alloc;
22
use crate::context::Context as ExecutorContext;
33
use crate::executor::Executor;
4+
use crate::switch;
45
use crate::waker_page::{WakerPage, WakerPageRef, WAKER_PAGE_SIZE};
56
use alloc::sync::Arc;
67
use alloc::vec;
78
use alloc::vec::Vec;
89
use core::cell::RefCell;
9-
use core::ops::DerefMut;
1010
use lazy_static::*;
11+
use riscv::register::sstatus;
1112
use log::warn;
1213
use spin::Mutex;
1314
use unicycle::pin_slab::PinSlab;
@@ -136,8 +137,10 @@ pub struct ExecutorRuntime<F: Future<Output = ()> + Unpin> {
136137
strong_executor: Arc<Pin<Box<Executor<F>>>>,
137138

138139
// 该executor在执行完一次后就会被drop
139-
weak_executor_vec: Vec<Arc<Pin<Box<Executor<F>>>>>,
140+
weak_executor_vec: Vec<Option<Arc<Pin<Box<Executor<F>>>>>>,
140141

142+
// 当前正在执行的executor
143+
current_executor: Option<Arc<Pin<Box<Executor<F>>>>>,
141144
// runtime context
142145
context: ExecutorContext,
143146
}
@@ -150,6 +153,7 @@ impl<F: Future<Output = ()> + Unpin> ExecutorRuntime<F> {
150153
priority_inner: priority_inner,
151154
strong_executor: Arc::new(Executor::new(priority_inner_cloned)),
152155
weak_executor_vec: vec![],
156+
current_executor: None,
153157
context: ExecutorContext::default(),
154158
};
155159
e
@@ -165,11 +169,14 @@ impl<F: Future<Output = ()> + Unpin> ExecutorRuntime<F> {
165169
}
166170

167171
fn add_weak_executor(&mut self, weak_executor: Arc<Pin<Box<Executor<F>>>>) {
168-
self.weak_executor_vec.push(weak_executor);
172+
self.weak_executor_vec.push(Some(weak_executor));
169173
}
170174

171175
fn downgrade_strong_executor(&mut self) {
172-
let old = self.strong_executor.clone();
176+
let mut old = self.strong_executor.clone();
177+
unsafe {
178+
Arc::get_mut_unchecked(&mut old).mark_weak();
179+
}
173180
self.add_weak_executor(old);
174181
self.strong_executor = Arc::new(Executor::new(self.priority_inner.clone()));
175182
}
@@ -181,24 +188,9 @@ impl<F: Future<Output = ()> + Unpin> Drop for ExecutorRuntime<F> {
181188
}
182189
}
183190

184-
// 运行executor.run()
185-
#[no_mangle]
186-
pub(crate) fn run_executor(executor_addr: usize) {
187-
log::warn!("executor addr {:x}", executor_addr);
188-
unsafe {
189-
let mut p = Box::from_raw(executor_addr as *mut Executor<Task>);
190-
p.run();
191-
}
192-
}
193-
194191
unsafe impl Send for ExecutorRuntime<Task> {}
195192
unsafe impl Sync for ExecutorRuntime<Task> {}
196193

197-
// lazy_static! {
198-
// pub static ref GLOBAL_RUNTIME: Mutex<RefCell<ExecutorRuntime<Task>>> =
199-
// Mutex::new(RefCell::new(ExecutorRuntime::new()));
200-
// }
201-
202194
lazy_static! {
203195
pub static ref GLOBAL_RUNTIME: Mutex<ExecutorRuntime<Task>> =
204196
Mutex::new(ExecutorRuntime::new());
@@ -209,24 +201,63 @@ pub fn spawn(future: impl Future<Output = ()> + Send + 'static) {
209201
return priority_spawn(future, DEFAULT_PRIORITY);
210202
}
211203

204+
//
212205
pub fn run() {
213206
log::warn!("GLOBAL_RUNTIME.run()");
214207
loop {
215-
let runtime = GLOBAL_RUNTIME.lock();
208+
let mut runtime = GLOBAL_RUNTIME.lock();
216209
let cx_ref = &runtime.context;
217-
let old_ctx = cx_ref as *const ExecutorContext as usize;
218-
let new_ctx = &(runtime.strong_executor.context) as *const ExecutorContext as usize;
210+
let runtime_cx = cx_ref as *const ExecutorContext as usize;
211+
let executor_cx = &(runtime.strong_executor.context) as *const ExecutorContext as usize;
219212
// 释放保护global_runtime的锁
213+
runtime.current_executor = Some(runtime.strong_executor.clone());
220214
drop(runtime);
221215
unsafe {
222-
crate::switch(old_ctx, new_ctx);
216+
log::warn!("run strong executor");
217+
crate::switch(runtime_cx, executor_cx);
223218
// 该函数返回说明当前strong_executor执行的future超时了,
224219
// 需要重新创建一个executor执行后续的future, 并且将
225220
// 新的executor作为strong_executor,旧的executor添
226221
// 加到weak_exector中。
227222
}
223+
log::warn!("switch return");
228224
let mut runtime = GLOBAL_RUNTIME.lock();
229-
runtime.downgrade_strong_executor();
225+
// 只有strong_executor主动yield时, 才会执行运行weak_executor;
226+
227+
if runtime.strong_executor.is_running_future() {
228+
runtime.downgrade_strong_executor();
229+
log::warn!("continued");
230+
continue;
231+
}
232+
233+
// 遍历全部的weak_executor
234+
let weak_executor_num = runtime.weak_executor_vec.len();
235+
log::warn!("run weak executor size={}", weak_executor_num);
236+
for i in 0..weak_executor_num {
237+
if runtime.weak_executor_vec[i].is_none() {
238+
continue;
239+
}
240+
let weak_executor = runtime.weak_executor_vec[i].as_ref().unwrap();
241+
if weak_executor.killed() {
242+
// TODO: 回收资源
243+
continue;
244+
}
245+
let executor_ctx = &weak_executor.context as *const ExecutorContext as usize;
246+
247+
runtime.current_executor = Some(weak_executor.clone());
248+
drop(runtime);
249+
unsafe {
250+
sstatus::set_sie();
251+
log::warn!("switch weak executor");
252+
switch(runtime_cx, executor_ctx);
253+
log::warn!("switch weak executor return");
254+
sstatus::clear_sie();
255+
}
256+
log::warn!("global locking");
257+
runtime = GLOBAL_RUNTIME.lock();
258+
log::warn!("global locking finish");
259+
}
260+
log::warn!("run weak executor finish");
230261
}
231262
}
232263

@@ -246,19 +277,48 @@ pub fn priority_spawn(future: impl Future<Output = ()> + Send + 'static, priorit
246277
// 切换到runtime的context
247278
pub fn handle_timeout() {
248279
let runtime = GLOBAL_RUNTIME.lock();
249-
if !runtime.strong_executor.is_running_future() {
280+
if !runtime.current_executor.as_ref().unwrap().is_running_future() {
250281
return;
251282
}
252283
let cx_ref = &runtime.context;
253-
let old_ctx = &(runtime.strong_executor.context) as *const ExecutorContext as usize;
254-
let new_ctx = cx_ref as *const ExecutorContext as usize;
284+
let executor_cx = &(runtime.strong_executor.context) as *const ExecutorContext as usize;
285+
let runtime_cx = cx_ref as *const ExecutorContext as usize;
255286
drop(runtime);
256-
log::warn!("switch to executor runtime");
257-
unsafe { crate::switch(old_ctx, new_ctx) };
287+
log::warn!("switching to executor runtime");
288+
unsafe { crate::switch(executor_cx, runtime_cx) };
258289
}
259290

260291
pub fn test_borrow() {
261292
let runtime = GLOBAL_RUNTIME.lock();
262293
let inner = runtime.priority_inner.get_mut_inner(DEFAULT_PRIORITY);
263-
log::warn!("borrow successfully {:?}", inner.deref() as *const _ as usize);
294+
log::warn!(
295+
"borrow successfully {:?}",
296+
inner.deref() as *const _ as usize
297+
);
298+
}
299+
300+
// 运行executor.run()
301+
#[no_mangle]
302+
pub(crate) fn run_executor(executor_addr: usize) {
303+
log::warn!("executor addr {:x}", executor_addr);
304+
unsafe {
305+
let mut p = Box::from_raw(executor_addr as *mut Executor<Task>);
306+
p.run();
307+
}
308+
let runtime = GLOBAL_RUNTIME.lock();
309+
let cx_ref = &runtime.context;
310+
let executor_cx = &(runtime.strong_executor.context) as *const ExecutorContext as usize;
311+
let runtime_cx = cx_ref as *const ExecutorContext as usize;
312+
drop(runtime);
313+
unsafe { crate::switch(executor_cx, runtime_cx) }
314+
unreachable!();
315+
}
316+
317+
pub(crate) fn yeild() {
318+
let runtime = GLOBAL_RUNTIME.lock();
319+
let cx_ref = &runtime.context;
320+
let executor_cx = &(runtime.strong_executor.context) as *const ExecutorContext as usize;
321+
let runtime_cx = cx_ref as *const ExecutorContext as usize;
322+
drop(runtime);
323+
unsafe { crate::switch(executor_cx, runtime_cx) }
264324
}

0 commit comments

Comments
 (0)