Skip to content

Commit

Permalink
Backport #932
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp committed Feb 21, 2021
1 parent 532168a commit c37181c
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 22 deletions.
5 changes: 3 additions & 2 deletions src/evaluation/Run.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ function run_reactive!(session::ServerSession, notebook::Notebook, old_topology:
cell.persist_js_state = persist_js_state || cell cells
putnotebookupdates!(session, notebook, clientupdate_cell_output(notebook, cell))

if any_interrupted
if any_interrupted || notebook.wants_to_interrupt
relay_reactivity_error!(cell, InterruptException())
else
run = run_single!((session, notebook), cell, new_topology[cell])
Expand All @@ -82,7 +82,8 @@ function run_reactive!(session::ServerSession, notebook::Notebook, old_topology:
cell.running = false
putnotebookupdates!(session, notebook, clientupdate_cell_output(notebook, cell))
end


notebook.wants_to_interrupt = false
# allow other `run_reactive!` calls to be executed
put!(notebook.executetoken)
return new_order
Expand Down
50 changes: 33 additions & 17 deletions src/evaluation/WorkspaceManager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const workspaces = Dict{UUID,Promise{Workspace}}()
Only workspaces on a separate process can be stopped during execution. Windows currently supports `true`
only partially: you can't stop cells on Windows.
"""
function make_workspace((session, notebook)::Tuple{ServerSession, Notebook})::Workspace
function make_workspace((session, notebook)::Tuple{ServerSession,Notebook})::Workspace
pid = if session.options.evaluation.workspace_use_distributed
create_workspaceprocess(;compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler))
else
Expand All @@ -64,7 +64,7 @@ function make_workspace((session, notebook)::Tuple{ServerSession, Notebook})::Wo
return workspace
end

function start_relaying_logs((session, notebook)::Tuple{ServerSession, Notebook}, log_channel::Distributed.RemoteChannel)
function start_relaying_logs((session, notebook)::Tuple{ServerSession,Notebook}, log_channel::Distributed.RemoteChannel)
while true
try
next_log = take!(log_channel)
Expand Down Expand Up @@ -107,7 +107,7 @@ function _merge_notebook_compiler_options(notebook::Notebook, options::CompilerO
return options
end

kwargs = Dict{Symbol, Any}()
kwargs = Dict{Symbol,Any}()
for each in fieldnames(CompilerOptions)
# 1. not specified by notebook options
# 2. notebook specified project options
Expand Down Expand Up @@ -181,7 +181,7 @@ function create_workspaceprocess(;compiler_options=CompilerOptions())::Integer
end

"Return the `Workspace` of `notebook`; will be created if none exists yet."
function get_workspace(session_notebook::Tuple{ServerSession, Notebook})::Workspace
function get_workspace(session_notebook::Tuple{ServerSession,Notebook})::Workspace
session, notebook = session_notebook
promise = get!(workspaces, notebook.notebook_id) do
Promise{Workspace}(() -> make_workspace(session_notebook))
Expand Down Expand Up @@ -219,10 +219,11 @@ function eval_format_fetch_in_workspace(session_notebook::Union{Tuple{ServerSess

# a try block (on this process) to catch an InterruptException
take!(workspace.dowork_token)
try
early_result = try
# we use [pid] instead of pid to prevent fetching output
Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.run_expression($(QuoteNode(expr)), $cell_id, $function_wrapped_info)))
put!(workspace.dowork_token)
nothing
catch exs
# We don't use a `finally` because the token needs to be back asap
put!(workspace.dowork_token)
Expand All @@ -240,7 +241,9 @@ function eval_format_fetch_in_workspace(session_notebook::Union{Tuple{ServerSess
end
end

format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon)
early_result === nothing ?
format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon) :
early_result
end

"Evaluate expression inside the workspace - output is not fetched, errors are rethrown. For internal use."
Expand All @@ -251,7 +254,7 @@ function eval_in_workspace(session_notebook::Union{Tuple{ServerSession,Notebook}
nothing
end

function format_fetch_in_workspace(session_notebook::Union{Tuple{ServerSession,Notebook},Workspace}, cell_id, ends_with_semicolon, showmore_id::Union{PlutoRunner.ObjectDimPair, Nothing}=nothing)
function format_fetch_in_workspace(session_notebook::Union{Tuple{ServerSession,Notebook},Workspace}, cell_id, ends_with_semicolon, showmore_id::Union{PlutoRunner.ObjectDimPair,Nothing}=nothing)
workspace = get_workspace(session_notebook)

# instead of fetching the output value (which might not make sense in our context, since the user can define structs, types, functions, etc), we format the cell output on the worker, and fetch the formatted output.
Expand Down Expand Up @@ -280,10 +283,26 @@ function delete_vars(session_notebook::Union{Tuple{ServerSession,Notebook},Works
Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.move_vars($(old_workspace_name |> QuoteNode), $(new_workspace_name |> QuoteNode), $to_delete, $funcs_to_delete, $module_imports_to_move)))
end

function poll(query::Function, timeout::Real=Inf64, interval::Real=1/20)
start = time()
while time() < start + timeout
if query()
return true
end
sleep(interval)
end
return false
end

"Force interrupt (SIGINT) a workspace, return whether successful"
function interrupt_workspace(session_notebook::Union{Tuple{ServerSession,Notebook},Workspace}; verbose=true)::Bool
workspace = get_workspace(session_notebook)

if poll(() -> isready(workspace.dowork_token), 2.0, 5/100)
verbose && println("Cell finished, other cells cancelled!")
return true
end

if Sys.iswindows()
verbose && @warn "Unfortunately, stopping cells is currently not supported on Windows :(
Maybe the Windows Subsystem for Linux is right for you:
Expand All @@ -307,23 +326,20 @@ function interrupt_workspace(session_notebook::Union{Tuple{ServerSession,Noteboo
verbose && @info "Sending interrupt to process $(workspace.pid)"
Distributed.interrupt(workspace.pid)

delay = 5.0 # seconds
parts = 100

for _ in 1:parts
sleep(delay / parts)
if isready(workspace.dowork_token)
verbose && println("Cell interrupted!")
return true
end
if poll(() -> isready(workspace.dowork_token), 5.0, 5/100)
verbose && println("Cell interrupted!")
return true
end

verbose && println("Still running... starting sequence")
while !isready(workspace.dowork_token)
for _ in 1:5
verbose && print(" 🔥 ")
Distributed.interrupt(workspace.pid)
sleep(0.2)
sleep(0.18)
if isready(workspace.dowork_token)
break
end
end
sleep(1.5)
end
Expand Down
4 changes: 3 additions & 1 deletion src/notebook/Notebook.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ mutable struct Notebook
# per notebook compiler options
# nothing means to use global session compiler options
compiler_options::Union{Nothing,Configuration.CompilerOptions}

wants_to_interrupt::Bool
end
# We can keep 128 updates pending. After this, any put! calls (i.e. calls that push an update to the notebook) will simply block, which is fine.
# This does mean that the Notebook can't be used if nothing is clearing the update channel.
Notebook(cells::Array{Cell,1}, path::AbstractString, notebook_id::UUID) =
Notebook(cells, path, notebook_id, NotebookTopology(), Channel(1024), Token(), nothing)
Notebook(cells, path, notebook_id, NotebookTopology(), Channel(1024), Token(), nothing, false)

Notebook(cells::Array{Cell,1}, path::AbstractString=numbered_until_new(joinpath(new_notebooks_directory(), cutename()))) = Notebook(cells, path, uuid1())

Expand Down
8 changes: 7 additions & 1 deletion src/runner/PlutoRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ If the third argument is a `Tuple{Set{Symbol}, Set{Symbol}}` containing the refe
This function is memoized: running the same expression a second time will simply call the same generated function again. This is much faster than evaluating the expression, because the function only needs to be Julia-compiled once. See https://github.com/fonsp/Pluto.jl/pull/720
"""
function run_expression(expr::Any, cell_id::UUID, function_wrapped_info::Union{Nothing,Tuple{Set{Symbol},Set{Symbol}}}=nothing)
cell_results[cell_id], cell_runtimes[cell_id] = if function_wrapped_info === nothing
result, runtime = if function_wrapped_info === nothing
proof = ReturnProof()
wrapped = timed_expr(expr, proof)
run_inside_trycatch(wrapped, cell_id, proof)
Expand All @@ -217,6 +217,12 @@ function run_expression(expr::Any, cell_id::UUID, function_wrapped_info::Union{N
ans, runtime
end
end

if (result isa CapturedException) && (result.ex isa InterruptException)
throw(result.ex)
end

cell_results[cell_id], cell_runtimes[cell_id] = result, runtime
end


Expand Down
11 changes: 10 additions & 1 deletion src/webserver/Dynamic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,16 @@ responses[:move_notebook_file] = (session::ServerSession, body, notebook::Notebo
end

responses[:interrupt_all] = (session::ServerSession, body, notebook::Notebook; initiator::Union{Initiator,Missing}=missing) -> let
success = WorkspaceManager.interrupt_workspace((session, notebook))
session_notebook = (session, notebook)
workspace = WorkspaceManager.get_workspace(session_notebook)

already_interrupting = notebook.wants_to_interrupt
anything_running = !isready(workspace.dowork_token)
if !already_interrupting && anything_running
notebook.wants_to_interrupt = true
WorkspaceManager.interrupt_workspace(session_notebook)
end

# TODO: notify user whether interrupt was successful (i.e. whether they are using a `ProcessWorkspace`)
end

Expand Down

2 comments on commit c37181c

@fonsp
Copy link
Owner Author

@fonsp fonsp commented on c37181c Feb 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register()

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/30517

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.12.21 -m "<description of version>" c37181c7e86b741cbee23135dd6fcf5d11f60a2d
git push origin v0.12.21

Please sign in to comment.