Skip to content

Commit 9571f1a

Browse files
committed
try to get semantic details of transforming channels right
1 parent 327e346 commit 9571f1a

File tree

1 file changed

+20
-10
lines changed

1 file changed

+20
-10
lines changed

src/transducers.es6.js

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,21 @@
33
var core = require('./core');
44

55

6-
function isReduced(x) {
6+
var isReduced = function(x) {
77
return x && x.__transducers_reduced__;
8-
}
8+
};
99

10-
function deref(x) {
10+
var deref = function(x) {
1111
return x.value;
12-
}
12+
};
13+
14+
var forward = function(from, to) {
15+
from.then(
16+
function(val) { to.resolve(val); },
17+
function(err) { to.reject(err); }
18+
);
19+
return to;
20+
};
1321

1422

1523
var channelReducer = function(ch) {
@@ -39,28 +47,30 @@ module.exports = function(ch, xform) {
3947

4048
return {
4149
push: function(val, handler) {
42-
return core.go(function*() {
50+
var deferred = core.go(function*() {
51+
var success = open;
4352
if (open) {
4453
var result = yield xf.step(null, val);
4554
if (isReduced(result)) {
46-
yield deref(yield xf.result(yield deref(result)));
4755
open = false;
56+
yield deref(yield xf.result(yield deref(result)));
4857
ch.close();
4958
};
50-
return true;
51-
} else
52-
return false;
59+
}
60+
return success;
5361
});
62+
63+
return handler ? forward(deferred, handler) : deferred;
5464
},
5565

5666
pull: function(handler) {
5767
return ch.pull(handler);
5868
},
5969

6070
close: function() {
71+
open = false;
6172
core.top(core.go(function*() {
6273
yield deref(yield xf.result());
63-
open = false;
6474
ch.close();
6575
}));
6676
}

0 commit comments

Comments
 (0)