From c37181c7e86b741cbee23135dd6fcf5d11f60a2d Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Sat, 20 Feb 2021 00:00:39 +0100 Subject: [PATCH] Backport #932 --- src/evaluation/Run.jl | 5 +-- src/evaluation/WorkspaceManager.jl | 50 ++++++++++++++++++++---------- src/notebook/Notebook.jl | 4 ++- src/runner/PlutoRunner.jl | 8 ++++- src/webserver/Dynamic.jl | 11 ++++++- 5 files changed, 56 insertions(+), 22 deletions(-) diff --git a/src/evaluation/Run.jl b/src/evaluation/Run.jl index 47f5a6bfe7..b9bc3bf870 100644 --- a/src/evaluation/Run.jl +++ b/src/evaluation/Run.jl @@ -72,7 +72,7 @@ function run_reactive!(session::ServerSession, notebook::Notebook, old_topology: cell.persist_js_state = persist_js_state || cell ∉ cells putnotebookupdates!(session, notebook, clientupdate_cell_output(notebook, cell)) - if any_interrupted + if any_interrupted || notebook.wants_to_interrupt relay_reactivity_error!(cell, InterruptException()) else run = run_single!((session, notebook), cell, new_topology[cell]) @@ -82,7 +82,8 @@ function run_reactive!(session::ServerSession, notebook::Notebook, old_topology: cell.running = false putnotebookupdates!(session, notebook, clientupdate_cell_output(notebook, cell)) end - + + notebook.wants_to_interrupt = false # allow other `run_reactive!` calls to be executed put!(notebook.executetoken) return new_order diff --git a/src/evaluation/WorkspaceManager.jl b/src/evaluation/WorkspaceManager.jl index 8fe35ec76c..0c7d6a3b6f 100644 --- a/src/evaluation/WorkspaceManager.jl +++ b/src/evaluation/WorkspaceManager.jl @@ -38,7 +38,7 @@ const workspaces = Dict{UUID,Promise{Workspace}}() Only workspaces on a separate process can be stopped during execution. Windows currently supports `true` only partially: you can't stop cells on Windows. """ -function make_workspace((session, notebook)::Tuple{ServerSession, Notebook})::Workspace +function make_workspace((session, notebook)::Tuple{ServerSession,Notebook})::Workspace pid = if session.options.evaluation.workspace_use_distributed create_workspaceprocess(;compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler)) else @@ -64,7 +64,7 @@ function make_workspace((session, notebook)::Tuple{ServerSession, Notebook})::Wo return workspace end -function start_relaying_logs((session, notebook)::Tuple{ServerSession, Notebook}, log_channel::Distributed.RemoteChannel) +function start_relaying_logs((session, notebook)::Tuple{ServerSession,Notebook}, log_channel::Distributed.RemoteChannel) while true try next_log = take!(log_channel) @@ -107,7 +107,7 @@ function _merge_notebook_compiler_options(notebook::Notebook, options::CompilerO return options end - kwargs = Dict{Symbol, Any}() + kwargs = Dict{Symbol,Any}() for each in fieldnames(CompilerOptions) # 1. not specified by notebook options # 2. notebook specified project options @@ -181,7 +181,7 @@ function create_workspaceprocess(;compiler_options=CompilerOptions())::Integer end "Return the `Workspace` of `notebook`; will be created if none exists yet." -function get_workspace(session_notebook::Tuple{ServerSession, Notebook})::Workspace +function get_workspace(session_notebook::Tuple{ServerSession,Notebook})::Workspace session, notebook = session_notebook promise = get!(workspaces, notebook.notebook_id) do Promise{Workspace}(() -> make_workspace(session_notebook)) @@ -219,10 +219,11 @@ function eval_format_fetch_in_workspace(session_notebook::Union{Tuple{ServerSess # a try block (on this process) to catch an InterruptException take!(workspace.dowork_token) - try + early_result = try # we use [pid] instead of pid to prevent fetching output Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.run_expression($(QuoteNode(expr)), $cell_id, $function_wrapped_info))) put!(workspace.dowork_token) + nothing catch exs # We don't use a `finally` because the token needs to be back asap put!(workspace.dowork_token) @@ -240,7 +241,9 @@ function eval_format_fetch_in_workspace(session_notebook::Union{Tuple{ServerSess end end - format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon) + early_result === nothing ? + format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon) : + early_result end "Evaluate expression inside the workspace - output is not fetched, errors are rethrown. For internal use." @@ -251,7 +254,7 @@ function eval_in_workspace(session_notebook::Union{Tuple{ServerSession,Notebook} nothing end -function format_fetch_in_workspace(session_notebook::Union{Tuple{ServerSession,Notebook},Workspace}, cell_id, ends_with_semicolon, showmore_id::Union{PlutoRunner.ObjectDimPair, Nothing}=nothing) +function format_fetch_in_workspace(session_notebook::Union{Tuple{ServerSession,Notebook},Workspace}, cell_id, ends_with_semicolon, showmore_id::Union{PlutoRunner.ObjectDimPair,Nothing}=nothing) workspace = get_workspace(session_notebook) # instead of fetching the output value (which might not make sense in our context, since the user can define structs, types, functions, etc), we format the cell output on the worker, and fetch the formatted output. @@ -280,10 +283,26 @@ function delete_vars(session_notebook::Union{Tuple{ServerSession,Notebook},Works Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.move_vars($(old_workspace_name |> QuoteNode), $(new_workspace_name |> QuoteNode), $to_delete, $funcs_to_delete, $module_imports_to_move))) end +function poll(query::Function, timeout::Real=Inf64, interval::Real=1/20) + start = time() + while time() < start + timeout + if query() + return true + end + sleep(interval) + end + return false +end + "Force interrupt (SIGINT) a workspace, return whether successful" function interrupt_workspace(session_notebook::Union{Tuple{ServerSession,Notebook},Workspace}; verbose=true)::Bool workspace = get_workspace(session_notebook) + if poll(() -> isready(workspace.dowork_token), 2.0, 5/100) + verbose && println("Cell finished, other cells cancelled!") + return true + end + if Sys.iswindows() verbose && @warn "Unfortunately, stopping cells is currently not supported on Windows :( Maybe the Windows Subsystem for Linux is right for you: @@ -307,15 +326,9 @@ function interrupt_workspace(session_notebook::Union{Tuple{ServerSession,Noteboo verbose && @info "Sending interrupt to process $(workspace.pid)" Distributed.interrupt(workspace.pid) - delay = 5.0 # seconds - parts = 100 - - for _ in 1:parts - sleep(delay / parts) - if isready(workspace.dowork_token) - verbose && println("Cell interrupted!") - return true - end + if poll(() -> isready(workspace.dowork_token), 5.0, 5/100) + verbose && println("Cell interrupted!") + return true end verbose && println("Still running... starting sequence") @@ -323,7 +336,10 @@ function interrupt_workspace(session_notebook::Union{Tuple{ServerSession,Noteboo for _ in 1:5 verbose && print(" 🔥 ") Distributed.interrupt(workspace.pid) - sleep(0.2) + sleep(0.18) + if isready(workspace.dowork_token) + break + end end sleep(1.5) end diff --git a/src/notebook/Notebook.jl b/src/notebook/Notebook.jl index fc2a0a1182..4852571e70 100644 --- a/src/notebook/Notebook.jl +++ b/src/notebook/Notebook.jl @@ -32,11 +32,13 @@ mutable struct Notebook # per notebook compiler options # nothing means to use global session compiler options compiler_options::Union{Nothing,Configuration.CompilerOptions} + + wants_to_interrupt::Bool end # We can keep 128 updates pending. After this, any put! calls (i.e. calls that push an update to the notebook) will simply block, which is fine. # This does mean that the Notebook can't be used if nothing is clearing the update channel. Notebook(cells::Array{Cell,1}, path::AbstractString, notebook_id::UUID) = - Notebook(cells, path, notebook_id, NotebookTopology(), Channel(1024), Token(), nothing) + Notebook(cells, path, notebook_id, NotebookTopology(), Channel(1024), Token(), nothing, false) Notebook(cells::Array{Cell,1}, path::AbstractString=numbered_until_new(joinpath(new_notebooks_directory(), cutename()))) = Notebook(cells, path, uuid1()) diff --git a/src/runner/PlutoRunner.jl b/src/runner/PlutoRunner.jl index acfc1c312a..28082f8d7f 100644 --- a/src/runner/PlutoRunner.jl +++ b/src/runner/PlutoRunner.jl @@ -190,7 +190,7 @@ If the third argument is a `Tuple{Set{Symbol}, Set{Symbol}}` containing the refe This function is memoized: running the same expression a second time will simply call the same generated function again. This is much faster than evaluating the expression, because the function only needs to be Julia-compiled once. See https://github.com/fonsp/Pluto.jl/pull/720 """ function run_expression(expr::Any, cell_id::UUID, function_wrapped_info::Union{Nothing,Tuple{Set{Symbol},Set{Symbol}}}=nothing) - cell_results[cell_id], cell_runtimes[cell_id] = if function_wrapped_info === nothing + result, runtime = if function_wrapped_info === nothing proof = ReturnProof() wrapped = timed_expr(expr, proof) run_inside_trycatch(wrapped, cell_id, proof) @@ -217,6 +217,12 @@ function run_expression(expr::Any, cell_id::UUID, function_wrapped_info::Union{N ans, runtime end end + + if (result isa CapturedException) && (result.ex isa InterruptException) + throw(result.ex) + end + + cell_results[cell_id], cell_runtimes[cell_id] = result, runtime end diff --git a/src/webserver/Dynamic.jl b/src/webserver/Dynamic.jl index 5b036363b6..07125050ef 100644 --- a/src/webserver/Dynamic.jl +++ b/src/webserver/Dynamic.jl @@ -143,7 +143,16 @@ responses[:move_notebook_file] = (session::ServerSession, body, notebook::Notebo end responses[:interrupt_all] = (session::ServerSession, body, notebook::Notebook; initiator::Union{Initiator,Missing}=missing) -> let - success = WorkspaceManager.interrupt_workspace((session, notebook)) + session_notebook = (session, notebook) + workspace = WorkspaceManager.get_workspace(session_notebook) + + already_interrupting = notebook.wants_to_interrupt + anything_running = !isready(workspace.dowork_token) + if !already_interrupting && anything_running + notebook.wants_to_interrupt = true + WorkspaceManager.interrupt_workspace(session_notebook) + end + # TODO: notify user whether interrupt was successful (i.e. whether they are using a `ProcessWorkspace`) end