Skip to content

Commit 89e5dce

Browse files
committed
added reducers
1 parent ce2d1d3 commit 89e5dce

File tree

2 files changed

+323
-0
lines changed

2 files changed

+323
-0
lines changed

build.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
<!-- <sysproperty key="clojure.compile.warn-on-reflection" value="true"/> -->
5454
<arg value="clojure.core"/>
5555
<arg value="clojure.core.protocols"/>
56+
<arg value="clojure.core.reducers"/>
5657
<arg value="clojure.main"/>
5758
<arg value="clojure.set"/>
5859
<arg value="clojure.xml"/>

src/clj/clojure/core/reducers.clj

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
; Copyright (c) Rich Hickey. All rights reserved.
2+
; The use and distribution terms for this software are covered by the
3+
; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4+
; which can be found in the file epl-v10.html at the root of this distribution.
5+
; By using this software in any fashion, you are agreeing to be bound by
6+
; the terms of this license.
7+
; You must not remove this notice, or any other, from this software.
8+
9+
(ns ^{:doc
10+
"A library for reduction and parallel folding. Alpha and subject
11+
to change. Note that fold and its derivatives require
12+
jsr166y.jar for fork/join support. See Clojure's pom.xml for the
13+
dependency info."
14+
:author "Rich Hickey"}
15+
clojure.core.reducers
16+
(:refer-clojure :exclude [reduce map filter remove take take-while drop flatten])
17+
(:require [clojure.walk :as walk]))
18+
19+
(alias 'core 'clojure.core)
20+
(set! *warn-on-reflection* true)
21+
22+
;;;;;;;;;;;;;; some fj stuff ;;;;;;;;;;
23+
;;todo - dynamic java 7+ detection and use
24+
25+
(def pool (delay (jsr166y.ForkJoinPool.)))
26+
27+
(defn fjtask [^Callable f]
28+
(jsr166y.ForkJoinTask/adapt f))
29+
30+
(defn- fjinvoke [f]
31+
(if (jsr166y.ForkJoinTask/inForkJoinPool)
32+
(f)
33+
(.invoke ^jsr166y.ForkJoinPool @pool ^jsr166y.ForkJoinTask (fjtask f))))
34+
35+
(defn- fjfork [task] (.fork ^jsr166y.ForkJoinTask task))
36+
37+
(defn- fjjoin [task] (.join ^jsr166y.ForkJoinTask task))
38+
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
39+
40+
(defn reduce
41+
"Like core/reduce except:
42+
When init is not provided, (f) is used.
43+
Maps are reduced with reduce-kv"
44+
([f coll] (reduce f (f) coll))
45+
([f init coll]
46+
(if (instance? java.util.Map coll)
47+
(clojure.core.protocols/kv-reduce coll f init)
48+
(clojure.core.protocols/coll-reduce coll f init))))
49+
50+
(defprotocol CollFold
51+
(coll-fold [coll n combinef reducef]))
52+
53+
(defn fold
54+
"Reduces a collection using a (potentially parallel) reduce-combine
55+
strategy. The collection is partitioned into groups of approximately
56+
n (default 512), each of which is reduced with reducef (with a seed
57+
value obtained by calling (combinef) with no arguments). The results
58+
of these reductions are then reduced with combinef (default
59+
reducef). combinef must be associative, and, when called with no
60+
arguments, (combinef) must produce its identity element. These
61+
operations may be performed in parallel, but the results will
62+
preserve order."
63+
{:added "1.5"}
64+
([reducef coll] (fold reducef reducef coll))
65+
([combinef reducef coll] (fold 512 combinef reducef coll))
66+
([n combinef reducef coll]
67+
(coll-fold coll n combinef reducef)))
68+
69+
(defn reducer
70+
"Given a reducible collection, and a transformation function xf,
71+
returns a reducible collection, where any supplied reducing
72+
fn will be transformed by xf. xf is a function of reducing fn to
73+
reducing fn."
74+
{:added "1.5"}
75+
([coll xf]
76+
(reify
77+
clojure.core.protocols/CollReduce
78+
(coll-reduce [this f1]
79+
(clojure.core.protocols/coll-reduce this f1 (f1)))
80+
(coll-reduce [_ f1 init]
81+
(clojure.core.protocols/coll-reduce coll (xf f1) init)))))
82+
83+
(defn folder
84+
"Given a foldable collection, and a transformation function xf,
85+
returns a foldable collection, where any supplied reducing
86+
fn will be transformed by xf. xf is a function of reducing fn to
87+
reducing fn."
88+
{:added "1.5"}
89+
([coll xf]
90+
(reify
91+
clojure.core.protocols/CollReduce
92+
(coll-reduce [_ f1]
93+
(clojure.core.protocols/coll-reduce coll (xf f1) (f1)))
94+
(coll-reduce [_ f1 init]
95+
(clojure.core.protocols/coll-reduce coll (xf f1) init))
96+
97+
CollFold
98+
(coll-fold [_ n combinef reducef]
99+
(coll-fold coll n combinef (xf reducef))))))
100+
101+
(defn- do-curried
102+
[name doc meta args body]
103+
(let [cargs (vec (butlast args))]
104+
`(defn ~name ~doc ~meta
105+
(~cargs (fn [x#] (~name ~@cargs x#)))
106+
(~args ~@body))))
107+
108+
(defmacro ^:private defcurried
109+
"Builds another arity of the fn that returns a fn awaiting the last
110+
param"
111+
[name doc meta args & body]
112+
(do-curried name doc meta args body))
113+
114+
(defn- do-rfn [f1 k fkv]
115+
`(fn
116+
([] (~f1))
117+
~(clojure.walk/postwalk
118+
#(if (sequential? %)
119+
((if (vector? %) vec identity)
120+
(core/remove #{k} %))
121+
%)
122+
fkv)
123+
~fkv))
124+
125+
(defmacro ^:private rfn
126+
"Builds 3-arity reducing fn given names of wrapped fn and key, and k/v impl."
127+
[[f1 k] fkv]
128+
(do-rfn f1 k fkv))
129+
130+
(defcurried map
131+
"Applies f to every value in the reduction of coll. Foldable."
132+
{:added "1.5"}
133+
[f coll]
134+
(folder coll
135+
(fn [f1]
136+
(rfn [f1 k]
137+
([ret k v]
138+
(f1 ret (f k v)))))))
139+
140+
(defcurried filter
141+
"Retains values in the reduction of coll for which (pred val)
142+
returns logical true. Foldable."
143+
{:added "1.5"}
144+
[pred coll]
145+
(folder coll
146+
(fn [f1]
147+
(rfn [f1 k]
148+
([ret k v]
149+
(if (pred k v)
150+
(f1 ret k v)
151+
ret))))))
152+
153+
(defcurried remove
154+
"Removes values in the reduction of coll for which (pred val)
155+
returns logical true. Foldable."
156+
{:added "1.5"}
157+
[pred coll]
158+
(filter (complement pred) coll))
159+
160+
(defcurried take-while
161+
"Ends the reduction of coll when (pred val) returns logical false."
162+
{:added "1.5"}
163+
[pred coll]
164+
(reducer coll
165+
(fn [f1]
166+
(rfn [f1 k]
167+
([ret k v]
168+
(if (pred k v)
169+
(f1 ret k v)
170+
(reduced ret)))))))
171+
172+
(defcurried take
173+
"Ends the reduction of coll after consuming n values."
174+
{:added "1.5"}
175+
[n coll]
176+
(reducer coll
177+
(fn [f1]
178+
(let [cnt (atom n)]
179+
(rfn [f1 k]
180+
([ret k v]
181+
(swap! cnt dec)
182+
(if (neg? @cnt)
183+
(reduced ret)
184+
(f1 ret k v))))))))
185+
186+
(defcurried drop
187+
"Elides the first n values from the reduction of coll."
188+
{:added "1.5"}
189+
[n coll]
190+
(reducer coll
191+
(fn [f1]
192+
(let [cnt (atom n)]
193+
(rfn [f1 k]
194+
([ret k v]
195+
(swap! cnt dec)
196+
(if (neg? @cnt)
197+
(f1 ret k v)
198+
ret)))))))
199+
200+
(defcurried flatten
201+
"Takes any nested combination of sequential things (lists, vectors,
202+
etc.) and returns their contents as a single, flat foldable
203+
collection."
204+
{:added "1.5"}
205+
[coll]
206+
(let [rf (fn [f1]
207+
(fn
208+
([] (f1))
209+
([ret v]
210+
(if (sequential? v)
211+
(clojure.core.protocols/coll-reduce (flatten v) f1 ret)
212+
(f1 ret v)))))]
213+
(reify
214+
clojure.core.protocols/CollReduce
215+
(coll-reduce [this f1] (clojure.core.protocols/coll-reduce this f1 (f1)))
216+
(coll-reduce [_ f1 init] (clojure.core.protocols/coll-reduce coll (rf f1) init))
217+
218+
CollFold
219+
(coll-fold [_ n combinef reducef] (coll-fold coll n combinef (rf reducef))))))
220+
221+
;;do not construct this directly, use cat
222+
(deftype Cat [cnt left right]
223+
clojure.lang.Counted
224+
(count [_] cnt)
225+
226+
clojure.lang.Seqable
227+
(seq [_] (concat (seq left) (seq right)))
228+
229+
clojure.core.protocols/CollReduce
230+
(coll-reduce [this f1] (clojure.core.protocols/coll-reduce this f1 (f1)))
231+
(coll-reduce
232+
[_ f1 init]
233+
(clojure.core.protocols/coll-reduce
234+
right f1
235+
(clojure.core.protocols/coll-reduce left f1 init)))
236+
237+
CollFold
238+
(coll-fold
239+
[_ n combinef reducef]
240+
(fjinvoke
241+
(fn []
242+
(let [rt (fjfork (fjtask #(coll-fold right n combinef reducef)))]
243+
(combinef
244+
(coll-fold left n combinef reducef)
245+
(fjjoin rt)))))))
246+
247+
(defn cat
248+
"A high-performance combining fn that yields the catenation of the
249+
reduced values. The result is reducible, foldable, seqable and
250+
counted, providing the identity collections are reducible, seqable
251+
and counted. The single argument version will build a combining fn
252+
with the supplied identity constructor. Tests for identity
253+
with (zero? (count x)). See also foldcat."
254+
{:added "1.5"}
255+
([] (java.util.ArrayList.))
256+
([ctor]
257+
(fn
258+
([] (ctor))
259+
([left right] (cat left right))))
260+
([left right]
261+
(cond
262+
(zero? (count left)) right
263+
(zero? (count right)) left
264+
:else
265+
(Cat. (+ (count left) (count right)) left right))))
266+
267+
(defn append!
268+
".adds x to acc and returns acc"
269+
{:added "1.5"}
270+
[^java.util.Collection acc x]
271+
(doto acc (.add x)))
272+
273+
(defn foldcat
274+
"Equivalent to (fold cat append! coll)"
275+
{:added "1.5"}
276+
[coll]
277+
(fold cat append! coll))
278+
279+
(defn monoid
280+
"Builds a combining fn out of the supplied operator and identity
281+
constructor. op must be associative and ctor called with no args
282+
must return an identity value for it."
283+
{:added "1.5"}
284+
[op ctor]
285+
(fn m
286+
([] (ctor))
287+
([a b] (op a b))))
288+
289+
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; fold impls ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
290+
(defn- foldvec
291+
[v n combinef reducef]
292+
(cond
293+
(empty? v) (combinef)
294+
(<= (count v) n) (reduce reducef (combinef) v)
295+
:else
296+
(let [split (quot (count v) 2)
297+
v1 (subvec v 0 split)
298+
v2 (subvec v split (count v))
299+
fc (fn [child] #(foldvec child n combinef reducef))]
300+
(fjinvoke
301+
#(let [f1 (fc v1)
302+
t2 (fjtask (fc v2))]
303+
(fjfork t2)
304+
(combinef (f1) (fjjoin t2)))))))
305+
306+
(extend-protocol CollFold
307+
Object
308+
(coll-fold
309+
[coll n combinef reducef]
310+
;;can't fold, single reduce
311+
(reduce reducef (combinef) coll))
312+
313+
clojure.lang.IPersistentVector
314+
(coll-fold
315+
[v n combinef reducef]
316+
(foldvec v n combinef reducef))
317+
318+
clojure.lang.PersistentHashMap
319+
(coll-fold
320+
[m n combinef reducef]
321+
(.fold m n combinef reducef fjinvoke fjtask fjfork fjjoin)))
322+

0 commit comments

Comments
 (0)