Skip to content

Commit 4876c14

Browse files
committed
added parallel fold
1 parent f8856dc commit 4876c14

File tree

1 file changed

+72
-3
lines changed

1 file changed

+72
-3
lines changed

src/jvm/clojure/lang/PersistentHashMap.java

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@
1111
package clojure.lang;
1212

1313
import java.io.Serializable;
14-
import java.util.Iterator;
15-
import java.util.List;
16-
import java.util.Map;
14+
import java.util.*;
15+
import java.util.concurrent.Callable;
1716
import java.util.concurrent.atomic.AtomicReference;
1817

1918
/*
@@ -188,6 +187,22 @@ public Object kvreduce(IFn f, Object init){
188187
return init;
189188
}
190189

190+
public Object fold(long n, final IFn combinef, final IFn reducef,
191+
IFn fjinvoke, final IFn fjtask, final IFn fjfork, final IFn fjjoin){
192+
//we are ignoring n for now
193+
Callable top = new Callable(){
194+
public Object call() throws Exception{
195+
Object ret = combinef.invoke();
196+
if(root != null)
197+
ret = combinef.invoke(ret, root.fold(combinef,reducef,fjtask,fjfork,fjjoin));
198+
return hasNull?
199+
combinef.invoke(ret,reducef.invoke(combinef.invoke(),null,nullValue))
200+
:ret;
201+
}
202+
};
203+
return fjinvoke.invoke(top);
204+
}
205+
191206
public int count(){
192207
return count;
193208
}
@@ -324,6 +339,7 @@ static interface INode extends Serializable {
324339

325340
public Object kvreduce(IFn f, Object init);
326341

342+
Object fold(IFn combinef, IFn reducef, IFn fjtask, IFn fjfork, IFn fjjoin);
327343
}
328344

329345
final static class ArrayNode implements INode{
@@ -395,6 +411,52 @@ public Object kvreduce(IFn f, Object init){
395411
return init;
396412
}
397413

414+
public Object fold(final IFn combinef, final IFn reducef,
415+
final IFn fjtask, final IFn fjfork, final IFn fjjoin){
416+
List<Callable> tasks = new ArrayList();
417+
for(final INode node : array){
418+
if(node != null){
419+
tasks.add(new Callable(){
420+
public Object call() throws Exception{
421+
return node.fold(combinef, reducef, fjtask, fjfork, fjjoin);
422+
}
423+
});
424+
}
425+
}
426+
427+
return foldTasks(tasks,combinef,fjtask,fjfork,fjjoin);
428+
}
429+
430+
static public Object foldTasks(List<Callable> tasks, final IFn combinef,
431+
final IFn fjtask, final IFn fjfork, final IFn fjjoin){
432+
433+
if(tasks.isEmpty())
434+
return combinef.invoke();
435+
436+
if(tasks.size() == 1){
437+
Object ret = null;
438+
try
439+
{
440+
return tasks.get(0).call();
441+
}
442+
catch(Exception e)
443+
{
444+
//aargh
445+
}
446+
}
447+
448+
List<Callable> t1 = tasks.subList(0,tasks.size()/2);
449+
final List<Callable> t2 = tasks.subList(tasks.size()/2, tasks.size());
450+
451+
Object forked = fjfork.invoke(fjtask.invoke(new Callable() {
452+
public Object call() throws Exception{
453+
return foldTasks(t2,combinef,fjtask,fjfork,fjjoin);
454+
}
455+
}));
456+
457+
return combinef.invoke(foldTasks(t1,combinef,fjtask,fjfork,fjjoin),fjjoin.invoke(forked));
458+
}
459+
398460

399461
private ArrayNode ensureEditable(AtomicReference<Thread> edit){
400462
if(this.edit == edit)
@@ -629,6 +691,9 @@ public Object kvreduce(IFn f, Object init){
629691
return NodeSeq.kvreduce(array,f,init);
630692
}
631693

694+
public Object fold(IFn combinef, IFn reducef, IFn fjtask, IFn fjfork, IFn fjjoin){
695+
return NodeSeq.kvreduce(array, reducef, combinef.invoke());
696+
}
632697

633698
private BitmapIndexedNode ensureEditable(AtomicReference<Thread> edit){
634699
if(this.edit == edit)
@@ -818,6 +883,10 @@ public Object kvreduce(IFn f, Object init){
818883
return NodeSeq.kvreduce(array,f,init);
819884
}
820885

886+
public Object fold(IFn combinef, IFn reducef, IFn fjtask, IFn fjfork, IFn fjjoin){
887+
return NodeSeq.kvreduce(array, reducef, combinef.invoke());
888+
}
889+
821890
public int findIndex(Object key){
822891
for(int i = 0; i < 2*count; i+=2)
823892
{

0 commit comments

Comments
 (0)