-
-
Notifications
You must be signed in to change notification settings - Fork 297
/
Run.jl
205 lines (166 loc) Β· 7.69 KB
/
Run.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import REPL: ends_with_semicolon
import .Configuration
import .ExpressionExplorer: FunctionNameSignaturePair, is_joined_funcname
Base.push!(x::Set{Cell}) = x
"Like @async except it prints errors to the terminal. πΆ"
macro asynclog(expr)
quote
@async begin
# because this is being run asynchronously, we need to catch exceptions manually
try
$(esc(expr))
catch ex
bt = stacktrace(catch_backtrace())
showerror(stderr, ex, bt)
rethrow(ex)
end
end
end
end
"Run given cells and all the cells that depend on them, based on the topology information before and after the changes."
function run_reactive!(session::ServerSession, notebook::Notebook, old_topology::NotebookTopology, new_topology::NotebookTopology, cells::Array{Cell,1}; deletion_hook::Function=WorkspaceManager.delete_vars, persist_js_state::Bool=false)::TopologicalOrder
# make sure that we're the only `run_reactive!` being executed - like a semaphor
take!(notebook.executetoken)
removed_cells = setdiff(keys(old_topology.nodes), keys(new_topology.nodes))
for cell::Cell in removed_cells
cell.code = ""
cell.parsedcode = parse_custom(notebook, cell)
cell.module_usings = Set{Expr}()
cell.rootassignee = nothing
end
cells::Vector{Cell} = [cells..., removed_cells...]
new_topology = NotebookTopology(merge(
new_topology.nodes,
Dict(cell => ReactiveNode() for cell in removed_cells),
))
# save the old topological order - we'll delete variables assigned from it and re-evalutate its cells
old_order = topological_order(notebook, old_topology, cells)
old_runnable = old_order.runnable
to_delete_vars = union!(Set{Symbol}(), defined_variables(old_topology, old_runnable)...)
to_delete_funcs = union!(Set{Tuple{UUID,FunctionName}}(), defined_functions(old_topology, old_runnable)...)
# get the new topological order
new_order = topological_order(notebook, new_topology, union(cells, keys(old_order.errable)))
to_run = setdiff(union(new_order.runnable, old_order.runnable), keys(new_order.errable))::Array{Cell,1} # TODO: think if old error cell order matters
# change the bar on the sides of cells to "queued"
# local listeners = ClientSession[]
for cell in to_run
cell.queued = true
# listeners = putnotebookupdates!(session, notebook, clientupdate_cell_queued(notebook, cell); flush=false)
end
for (cell, error) in new_order.errable
cell.running = false
cell.queued = false
relay_reactivity_error!(cell, error)
end
# Send intermediate updates to the clients at most 20 times / second during a reactive run. (The effective speed of a slider is still unbounded, because the last update is not throttled.)
send_notebook_changes_throttled = throttled(1.0/20, 0.0/20) do
send_notebook_changes!(ClientRequest(session=session, notebook=notebook))
end
send_notebook_changes_throttled()
# delete new variables that will be defined by a cell
new_runnable = new_order.runnable
to_delete_vars = union!(to_delete_vars, defined_variables(new_topology, new_runnable)...)
to_delete_funcs = union!(to_delete_funcs, defined_functions(new_topology, new_runnable)...)
# delete new variables in case a cell errors (then the later cells show an UndefVarError)
new_errable = keys(new_order.errable)
to_delete_vars = union!(to_delete_vars, defined_variables(new_topology, new_errable)...)
to_delete_funcs = union!(to_delete_funcs, defined_functions(new_topology, new_errable)...)
to_reimport = union(Set{Expr}(), map(c -> c.module_usings, setdiff(notebook.cells, to_run))...)
deletion_hook((session, notebook), to_delete_vars, to_delete_funcs, to_reimport; to_run=to_run) # `deletion_hook` defaults to `WorkspaceManager.delete_vars`
delete!.([notebook.bonds], to_delete_vars)
local any_interrupted = false
for (i, cell) in enumerate(to_run)
cell.queued = false
cell.running = true
cell.persist_js_state = persist_js_state || cell β cells
send_notebook_changes_throttled()
if any_interrupted || notebook.wants_to_interrupt
relay_reactivity_error!(cell, InterruptException())
else
run = run_single!((session, notebook), cell, new_topology[cell])
any_interrupted |= run.interrupted
end
cell.running = false
end
notebook.wants_to_interrupt = false
send_notebook_changes!(ClientRequest(session=session, notebook=notebook))
# allow other `run_reactive!` calls to be executed
put!(notebook.executetoken)
return new_order
end
const lazymap = Base.Generator
function defined_variables(topology::NotebookTopology, cells)
lazymap(cells) do cell
topology[cell].definitions
end
end
function defined_functions(topology::NotebookTopology, cells)
lazymap(cells) do cell
((cell.cell_id, namesig.name) for namesig in topology[cell].funcdefs_with_signatures)
end
end
"Run a single cell non-reactively, set its output, return run information."
function run_single!(session_notebook::Union{Tuple{ServerSession,Notebook},WorkspaceManager.Workspace}, cell::Cell, reactive_node::ReactiveNode)
run = WorkspaceManager.eval_format_fetch_in_workspace(session_notebook, cell.parsedcode, cell.cell_id, ends_with_semicolon(cell.code), cell.function_wrapped ? (filter(!is_joined_funcname, reactive_node.references), reactive_node.definitions) : nothing)
set_output!(cell, run)
return run
end
function set_output!(cell::Cell, run)
cell.last_run_timestamp = time()
cell.runtime = run.runtime
cell.output_repr = run.output_formatted[1]
cell.repr_mime = run.output_formatted[2]
cell.errored = run.errored
end
###
# CONVENIENCE FUNCTIONS
###
"Do all the things!"
function update_save_run!(session::ServerSession, notebook::Notebook, cells::Array{Cell,1}; save::Bool=true, run_async::Bool=false, prerender_text::Bool=false, kwargs...)
update_caches!(notebook, cells)
old = notebook.topology
new = notebook.topology = updated_topology(old, notebook, cells)
save && save_notebook(notebook)
# _assume `prerender_text == false` if you want to skip some details_
to_run_online = if !prerender_text
cells
else
# this code block will run cells that only contain text offline, i.e. on the server process, before doing anything else
# this makes the notebook load a lot faster - the front-end does not have to wait for each output, and perform costly reflows whenever one updates
# "A Workspace on the main process, used to prerender markdown before starting a notebook process for speedy UI."
original_pwd = pwd()
offline_workspace = WorkspaceManager.make_workspace(
(ServerSession(options=Configuration.Options(evaluation=Configuration.EvaluationOptions(workspace_use_distributed=false))),
notebook,)
)
to_run_offline = filter(c -> !c.running && is_just_text(new, c) && is_just_text(old, c), cells)
for cell in to_run_offline
run_single!(offline_workspace, cell, new[cell])
end
cd(original_pwd)
setdiff(cells, to_run_offline)
end
if run_async
@asynclog run_reactive!(session, notebook, old, new, to_run_online; kwargs...)
else
run_reactive!(session, notebook, old, new, to_run_online; kwargs...)
end
end
# Only used in tests!
update_save_run!(session::ServerSession, notebook::Notebook, cell::Cell; kwargs...) = update_save_run!(session, notebook, [cell]; kwargs...)
update_run!(args...) = update_save_run!(args...; save=false)
"Create a throttled function, which calls the given function `f` at most once per given interval `max_delay`.
It is _leading_ (`f` is invoked immediately) and _not trailing_ (calls during a cooldown period are ignored).
An optional third argument sets an initial cooldown period, default is `0`. With a non-zero value, the throttle is no longer _leading_."
function throttled(f::Function, max_delay::Real, initial_offset::Real=0)
local last_run_at = time() - max_delay + initial_offset
# return f
() -> begin
now = time()
if now - last_run_at >= max_delay
f()
last_run_at = now
end
nothing
end
end