-
Notifications
You must be signed in to change notification settings - Fork 1
/
test.plj
180 lines (157 loc) · 8.82 KB
/
test.plj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
defn flosures [flow]
"Produces fns for operating on the state of the given flow."
letfn [
; ===================================
; = Destroying state after a change =
; ===================================
(eager? [key] (= (-> key flow :timing) :eager))
(obliv? [key] (= (-> key flow :timing) :oblivious))
(key-parents [key] (-> key flow :parents))
(key-children [state key] (filter state (-> key flow :children)))
(compute [state key] ((:fn (flow key)) state))
update-or-forget-children [protected-keys state key]
"Utility fn used to propagate update messages to children."
(reduce (update-or-forget protected-keys) state (key-children state key))
forget [state & keys]
"User-callable fn used to dissoc certain keys from a state."
let [k (set (filter state keys))]
(reduce (partial update-or-forget-children k) (apply dissoc state k) k)
update-or-forget [protected-keys]
"Utility fn that decides what to do with a key when one of its parents
has updated."
fn [state key]
cond
; If the key is already being dealt with, do nothing.
(protected-keys key) state
; If the key is oblivious, do nothing.
(obliv? key) state
; If the key is eager, try to update it.
(eager? key) (try-eager-update protected-keys state key)
; Otherwise, get rid of it and its children.
true (update-or-forget-children protected-keys (dissoc state key) (key-children state key))
change [state new-substate]
"User-callable fn used to compute a state's value at certain keys."
let [k (set (filter state (keys new-substate)))]
(reduce (partial update-or-forget-children k) (merge state new-substate) k)
; ======================
; = Sequential updates =
; ======================
try-eager-update [protected-keys state key]
"Utility fn used to attempt to update an eager node."
if (has-keys? state (key-parents key))
; If the parents are all there, try the computation.
let [new-val (compute state key)]
if (= new-val (state key))
; If the value is unchanged, do nothing.
state
; If the value has changed, update the value but dissoc the children.
(update-or-forget-children protected-keys (assoc state key new-val) key)
; If the parents are not there, dissoc the node and the children.
(update-or-forget-children protected-keys state key)
update-node [state key]
"Updates the state with the value corresponding to key,
and any ancestral values necessary to compute it."
if (state key)
state
(assoc state key (compute (reduce update-node state (:parents (flow key))) key))
update-nodes [state & keys]
"Evaluates the state at given keys. Propagates message of
recomputation to parents."
(reduce update-node state keys)
; ======================
; = Concurrent updates =
; ======================
send-fn [state msg val collating-agent]
"Creates a fn to be mapped over keys, which sends messages to
the keys' agents."
(fn [key] (send (state key) msg state key val collating-agent))
send-notify-return [state msg val key collating-agent children]
"Wrapped by msg-err and update-and-notify"
do
(if collating-agent (send collating-agent msg-record {key val}))
(map-now (send-fn state msg {key val} collating-agent) children)
val
msg-err [parent-vals state key err collating-agent children]
"Action taken by state-managing agents upon encountering an error.
The error is stored in the agent and propagated to children. It is
currently not possible to re-raise the error."
(send-notify-return state msg-err err key collating-agent children)
update-and-notify [state collating-agent key node new-vals children]
"Used by msg-update."
let [new-val ((:fn node) new-vals)]
(send-notify-return state msg-update new-val key collating-agent children)
msg-update [parent-vals state key new-pv collating-agent]
"A message that a parent can send to a child when it has
updated."
let [new-vals (merge parent-vals new-pv)
node (flow key)
children (:children node)]
if (same-number? new-vals (:parents node))
; If the value can be computed, do it.
try (update-and-notify state collating-agent key node new-vals children)
(catch Exception err (msg-err parent-vals state key err collating-agent children))
; Otherwise just record the parent's value.
new-vals
msg-record [[state keys-remaining] new-vals]
"A message agents send to the collating agent when they update.
Its value consists of a [state keys-remaining] couple. When
keys-remaining is zero, the requested update has been made."
let [new-state (merge state new-vals)
new-keys (apply (partial disj keys-remaining) (keys new-vals))]
[new-state new-keys]
create-agent [orig-state [state roots] key]
"Creates an agent at the given key, which is responsible for
computing the value at that key. Creates agents for parents
if necessary."
if (state key) [state roots]
let [parents (:parents (flow key))
parent-vals (select-keys orig-state parents)
[state roots] (reduce (partial create-agent orig-state) [state roots] parents)]
; Update the state with new agents at this node and at the parent nodes
[(assoc state key (agent parent-vals))
; If this is a root node, add it to the roots.
(if (same-number? parent-vals parents) (conj roots key) roots)]
create-agents [state & keys-to-update]
"Returns a state with agents in the keys that need to be updated,
and the set of 'root keys' corresponding to agents that can start
updating immediately."
(reduce (partial create-agent state) [state []] keys-to-update)
start-c-update [state roots collating-agent]
"Used by the concurrent updates."
(fn [] (map-now (send-fn state msg-update {} collating-agent) roots))
concurrent-update [state & keys-to-update]
"Returns a fn that starts the update, and the new state with agents
corresponding to all the keys whose values are requested but not
known."
let [[new-state roots] (apply create-agents state keys-to-update)
start (start-c-update new-state roots nil)]
[start new-state]
agent-update [state & keys-to-update]
"Does a concurrent update of the given keys. Returns two things: a fn
to start the update and an agent whose state will eventually change to
[requested state []]."
let [[new-state roots] (apply create-agents state keys-to-update)
; An agent whose value will eventually be the up-to-date state
collating-agent (agent [state (set (filter (comp not state) (keys new-state)))])
start (start-c-update new-state roots collating-agent)]
[start collating-agent]
future-update [state & keys-to-update]
"Does a concurrent update of the given keys. Returns a delay which, when
forced, returns the updated state."
let [[s a] (apply agent-update state keys-to-update)
; Create a countdown latch and a watcher that opens the latch when the state is ready.
latch (java.util.concurrent.CountDownLatch. 1)
w
add-watch a latch
fn [#^java.util.concurrent.CountDownLatch latch r old-v new-v]
(if (= (count (new-v 1)) 0) (.countDown latch))
; Start the update.
nothing (s)]
(delay (do (.await latch) (@a 0)))
]
{:update update-nodes
:forget forget
:change change
:c-update concurrent-update
:a-update agent-update
:f-update future-update}