Skip to content

automatically wait for worker threads #833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/ra_batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ impl BatchDatabase {
.collect();

let db = BatchDatabase::load(crate_graph, &mut vfs);
let _ = vfs.shutdown();
Ok((db, local_roots))
}
}
Expand Down
26 changes: 12 additions & 14 deletions crates/ra_lsp_server/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,20 @@ pub fn main_loop(
) -> Result<()> {
let pool = ThreadPool::new(THREADPOOL_SIZE);
let (task_sender, task_receiver) = unbounded::<Task>();
let (ws_worker, ws_watcher) = workspace_loader();

ws_worker.send(ws_root.clone()).unwrap();
// FIXME: support dynamic workspace loading.
let workspaces = match ws_worker.recv().unwrap() {
Ok(ws) => vec![ws],
Err(e) => {
log::error!("loading workspace failed: {}", e);
Vec::new()
let workspaces = {
let ws_worker = workspace_loader();
ws_worker.sender().send(ws_root.clone()).unwrap();
match ws_worker.receiver().recv().unwrap() {
Ok(ws) => vec![ws],
Err(e) => {
log::error!("loading workspace failed: {}", e);
Vec::new()
}
}
};
ws_worker.shutdown();
ws_watcher.shutdown().map_err(|_| format_err!("ws watcher died"))?;

let mut state = ServerWorldState::new(ws_root.clone(), workspaces);

log::info!("server initialized, serving requests");
Expand Down Expand Up @@ -94,12 +95,9 @@ pub fn main_loop(
log::info!("...threadpool has finished");

let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead");
let vfs_res = vfs.into_inner().shutdown();
drop(vfs);

main_res?;
vfs_res.map_err(|_| format_err!("fs watcher died"))?;

Ok(())
main_res
}

enum Event {
Expand Down
6 changes: 3 additions & 3 deletions crates/ra_lsp_server/src/project_model.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::path::PathBuf;

use thread_worker::{WorkerHandle, Worker};
use thread_worker::Worker;

use crate::Result;

pub use ra_project_model::{
ProjectWorkspace, CargoWorkspace, Package, Target, TargetKind, Sysroot,
};

pub fn workspace_loader() -> (Worker<PathBuf, Result<ProjectWorkspace>>, WorkerHandle) {
thread_worker::spawn::<PathBuf, Result<ProjectWorkspace>, _>(
pub fn workspace_loader() -> Worker<PathBuf, Result<ProjectWorkspace>> {
Worker::<PathBuf, Result<ProjectWorkspace>>::spawn(
"workspace loader",
1,
|input_receiver, output_sender| {
Expand Down
17 changes: 5 additions & 12 deletions crates/ra_lsp_server/tests/heavy_tests/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use lsp_types::{
use serde::Serialize;
use serde_json::{to_string_pretty, Value};
use tempfile::TempDir;
use thread_worker::{WorkerHandle, Worker};
use thread_worker::Worker;
use test_utils::{parse_fixture, find_mismatch};

use ra_lsp_server::{
Expand Down Expand Up @@ -45,13 +45,12 @@ pub struct Server {
messages: RefCell<Vec<RawMessage>>,
dir: TempDir,
worker: Option<Worker<RawMessage, RawMessage>>,
watcher: Option<WorkerHandle>,
}

impl Server {
fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
let path = dir.path().to_path_buf();
let (worker, watcher) = thread_worker::spawn::<RawMessage, RawMessage, _>(
let worker = Worker::<RawMessage, RawMessage>::spawn(
"test server",
128,
move |mut msg_receiver, mut msg_sender| {
Expand All @@ -63,7 +62,6 @@ impl Server {
dir,
messages: Default::default(),
worker: Some(worker),
watcher: Some(watcher),
};

for (path, text) in files {
Expand Down Expand Up @@ -117,7 +115,7 @@ impl Server {
}
fn send_request_(&self, r: RawRequest) -> Value {
let id = r.id;
self.worker.as_ref().unwrap().send(RawMessage::Request(r)).unwrap();
self.worker.as_ref().unwrap().sender().send(RawMessage::Request(r)).unwrap();
while let Some(msg) = self.recv() {
match msg {
RawMessage::Request(req) => panic!("unexpected request: {:?}", req),
Expand Down Expand Up @@ -157,24 +155,19 @@ impl Server {
}
}
fn recv(&self) -> Option<RawMessage> {
recv_timeout(&self.worker.as_ref().unwrap().out).map(|msg| {
recv_timeout(&self.worker.as_ref().unwrap().receiver()).map(|msg| {
self.messages.borrow_mut().push(msg.clone());
msg
})
}
fn send_notification(&self, not: RawNotification) {
self.worker.as_ref().unwrap().send(RawMessage::Notification(not)).unwrap();
self.worker.as_ref().unwrap().sender().send(RawMessage::Notification(not)).unwrap();
}
}

impl Drop for Server {
fn drop(&mut self) {
self.send_request::<Shutdown>(());
let receiver = self.worker.take().unwrap().shutdown();
while let Some(msg) = recv_timeout(&receiver) {
drop(msg);
}
self.watcher.take().unwrap().shutdown().unwrap();
}
}

Expand Down
91 changes: 35 additions & 56 deletions crates/ra_vfs/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::{
fs,
thread,
path::{Path, PathBuf},
sync::{mpsc, Arc},
time::Duration,
};
use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
use crossbeam_channel::{Sender, unbounded, RecvError, select};
use relative_path::RelativePathBuf;
use thread_worker::WorkerHandle;
use walkdir::WalkDir;
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};

Expand Down Expand Up @@ -48,37 +46,42 @@ enum ChangeKind {

const WATCHER_DELAY: Duration = Duration::from_millis(250);

pub(crate) struct Worker {
worker: thread_worker::Worker<Task, TaskResult>,
worker_handle: WorkerHandle,
}

impl Worker {
pub(crate) fn start(roots: Arc<Roots>) -> Worker {
// This is a pretty elaborate setup of threads & channels! It is
// explained by the following concerns:
// * we need to burn a thread translating from notify's mpsc to
// crossbeam_channel.
// * we want to read all files from a single thread, to guarantee that
// we always get fresher versions and never go back in time.
// * we want to tear down everything neatly during shutdown.
let (worker, worker_handle) = thread_worker::spawn(
"vfs",
128,
// This are the channels we use to communicate with outside world.
// If `input_receiver` is closed we need to tear ourselves down.
// `output_sender` should not be closed unless the parent died.
move |input_receiver, output_sender| {
pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
pub(crate) fn start(roots: Arc<Roots>) -> Worker {
// This is a pretty elaborate setup of threads & channels! It is
// explained by the following concerns:
// * we need to burn a thread translating from notify's mpsc to
// crossbeam_channel.
// * we want to read all files from a single thread, to guarantee that
// we always get fresher versions and never go back in time.
// * we want to tear down everything neatly during shutdown.
Worker::spawn(
"vfs",
128,
// This are the channels we use to communicate with outside world.
// If `input_receiver` is closed we need to tear ourselves down.
// `output_sender` should not be closed unless the parent died.
move |input_receiver, output_sender| {
// Make sure that the destruction order is
//
// * notify_sender
// * _thread
// * watcher_sender
//
// this is required to avoid deadlocks.

// These are the corresponding crossbeam channels
let (watcher_sender, watcher_receiver) = unbounded();
let _thread;
{
// These are `std` channels notify will send events to
let (notify_sender, notify_receiver) = mpsc::channel();
// These are the corresponding crossbeam channels
let (watcher_sender, watcher_receiver) = unbounded();

let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
.map_err(|e| log::error!("failed to spawn notify {}", e))
.ok();
// Start a silly thread to transform between two channels
let thread = thread::spawn(move || {
_thread = thread_worker::ScopedThread::spawn("notify-convertor", move || {
notify_receiver
.into_iter()
.for_each(|event| convert_notify_event(event, &watcher_sender))
Expand Down Expand Up @@ -110,35 +113,11 @@ impl Worker {
},
}
}
// Stopped the watcher
drop(watcher.take());
// Drain pending events: we are not interested in them anyways!
watcher_receiver.into_iter().for_each(|_| ());

let res = thread.join();
match &res {
Ok(()) => log::info!("... Watcher terminated with ok"),
Err(_) => log::error!("... Watcher terminated with err"),
}
res.unwrap();
},
);

Worker { worker, worker_handle }
}

pub(crate) fn sender(&self) -> &Sender<Task> {
&self.worker.inp
}

pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
&self.worker.out
}

pub(crate) fn shutdown(self) -> thread::Result<()> {
let _ = self.worker.shutdown();
self.worker_handle.shutdown()
}
}
// Drain pending events: we are not interested in them anyways!
watcher_receiver.into_iter().for_each(|_| ());
},
)
}

fn watch_root(
Expand Down
8 changes: 1 addition & 7 deletions crates/ra_vfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::{
fmt, fs, mem,
path::{Path, PathBuf},
sync::Arc,
thread,
};

use crossbeam_channel::Receiver;
Expand Down Expand Up @@ -160,7 +159,7 @@ impl fmt::Debug for Vfs {
impl Vfs {
pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
let roots = Arc::new(Roots::new(roots));
let worker = io::Worker::start(Arc::clone(&roots));
let worker = io::start(Arc::clone(&roots));
let mut root2files = ArenaMap::default();

for (root, config) in roots.iter() {
Expand Down Expand Up @@ -337,11 +336,6 @@ impl Vfs {
mem::replace(&mut self.pending_changes, Vec::new())
}

/// Shutdown the VFS and terminate the background watching thread.
pub fn shutdown(self) -> thread::Result<()> {
self.worker.shutdown()
}

fn add_file(
&mut self,
root: VfsRoot,
Expand Down
1 change: 0 additions & 1 deletion crates/ra_vfs/tests/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,5 @@ fn test_vfs_works() -> std::io::Result<()> {
Err(RecvTimeoutError::Timeout)
);

vfs.shutdown().unwrap();
Ok(())
}
1 change: 0 additions & 1 deletion crates/thread_worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ version = "0.1.0"
authors = ["rust-analyzer developers"]

[dependencies]
drop_bomb = "0.1.0"
crossbeam-channel = "0.3.5"
log = "0.4.3"

Loading