Skip to content

Commit da07d1c

Browse files
committed
Update to Java commit 87e9a57 (2013.08.19): added limits to pending put! and take! queues ... AND fix adding to buffer pending lists
1 parent d06c19d commit da07d1c

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

src/clojure/clojure/core/async/impl/channels.clj

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616

1717
(set! *warn-on-reflection* true)
1818

19+
(defmacro assert-unlock [lock test msg]
20+
`(when-not ~test
21+
(.unlock ~lock)
22+
(throw (new Exception (str "Assert failed: " ~msg "\n" (pr-str '~test)))))) ;;; AssertionError
23+
1924
(defn box [val]
2025
(reify clojure.lang.IDeref
2126
(deref [_] val)))
@@ -118,7 +123,12 @@
118123
nil))))
119124
(do
120125
(when (impl/active? handler)
121-
(.AddFirst puts [handler val])) ;;; .add
126+
(assert-unlock mutex
127+
(< (.Count puts) impl/MAX-QUEUE-SIZE) ;;; .size
128+
(str "No more than " impl/MAX-QUEUE-SIZE
129+
" pending puts are allowed on a single channel."
130+
" Consider using a windowed buffer."))
131+
(.AddLast puts [handler val])) ;;; .add
122132
(.unlock mutex)
123133
nil))))))
124134

@@ -216,7 +226,11 @@
216226
(box nil)
217227
nil))
218228
(do
219-
(.AddFirst takes handler) ;;; .add
229+
(assert-unlock mutex
230+
(< (.Count takes) impl/MAX-QUEUE-SIZE) ;;; .size
231+
(str "No more than " impl/MAX-QUEUE-SIZE
232+
" pending takes are allowed on a single channel."))
233+
(.AddLast takes handler) ;;; .add
220234
(.unlock mutex)
221235
nil)))))))
222236

src/clojure/clojure/core/async/impl/protocols.clj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
(ns ^{:skip-wiki true}
1212
clojure.core.async.impl.protocols)
1313

14+
15+
(def ^:const ^int MAX-QUEUE-SIZE 1024)
16+
1417
(defprotocol ReadPort
1518
(take! [port fn1-handler] "derefable val if taken, nil if take was enqueued"))
1619

test/clojure/clojure/core/async_test.clj

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,22 @@
109109
"When on-caller? requested, but no reader can consume the value,
110110
put!'s callback executes on a different thread."))
111111

112+
(deftest limit-async-take!-put!
113+
(testing "async put! limit"
114+
(let [c (chan)]
115+
(dotimes [x 1024]
116+
(put! c x))
117+
(is (thrown? Exception ;;;AssertionError
118+
(put! c 42)))
119+
(is (= (<!! c) 0)))) ;; make sure the channel unlocks
120+
(testing "async take! limit"
121+
(let [c (chan)]
122+
(dotimes [x 1024]
123+
(take! c (fn [x])))
124+
(is (thrown? Exception ;;;AssertionError
125+
(take! c (fn [x]))))
126+
(is (nil? (>!! c 42)))))) ;; make sure the channel unlocks
127+
112128
(deftest puts-fulfill-when-buffer-available
113129
(is (= :proceeded
114130
(let [c (chan 1)

0 commit comments

Comments
 (0)