Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switchOnNext #8

Open
ccorcos opened this issue Apr 22, 2015 · 29 comments
Open

switchOnNext #8

ccorcos opened this issue Apr 22, 2015 · 29 comments

Comments

@ccorcos
Copy link
Contributor

ccorcos commented Apr 22, 2015

I must say, I like the simplicity of this library.

Some functions that I think are missing are takeUntil and switchOnNext

Suppose you are dragging an element. Its helpful to cleanup streams when they're done being used with take and takeUntil.

mouseDowns = flyd.stream()
$('body').addEventListener('mousedown', mouseDowns);
mouseDowns.map (e) ->
  elem - $(e.target)
  mouseMoves = flyd.stream()
  $('body').addEventListener('mousemove', mouseMoves);
  mouseUps = flyd.stream().take(1)
  $('body').addEventListener('mouseup', mouseUps);
  mouseMoves.takeUntil(mouseUps).map (e) ->
    elem.css('translateX', e.pageX)
@paldepind
Copy link
Owner

I must say, I like the simplicity of this library.

Thank you!

These are great suggestions for Flyd modules and I'll be happy to create them. Would you please open a separate issue for takeUntil?

I'm however, not too fond of you example. I think a simpler implementation could be made with keepWhen.

mouseClicks = flyd.stream()
$('body').addEventListener('click', mouseClicks)
mouseDown = flyd.reduce(((acc, e) -> !acc), false, mouseClicks)
mouseMoves = flyd.stream()
$('body').addEventListener('mousemove', mouseMoves)
mouseDrags = keepWhen(mouseDown, mouseMoves)
flyd.map((e) ->
  elem.css('translateX', e.pageX)
, mouseDrags)

But that is obviously a matter of personal preference.

@ccorcos
Copy link
Contributor Author

ccorcos commented Apr 23, 2015

I like that. Its actually better than adding and removing event listeners on the fly.

@paldepind
Copy link
Owner

Yes. That is what I think as well. You also get the advantage that the mouseMoves stream can be used other places in the app as well.

@ccorcos
Copy link
Contributor Author

ccorcos commented Apr 24, 2015

I realized I didn't make the best case for switchOnNext so let me elaborate.

I was playing around with highland in this project integrating with React and Meteor.

Meteor has its own way of doing reactivity. Basically, something like this Messages.find({username:'chet'}).fetch(), is actually reactive. With Flyd, I might do something like this:

c = Tracker.autorun(function() {
  messages = Messages.find({username:'chet'}).fetch()
  messagesStream(messages)
})

messagesStream = flyd.stream()

Whenever a message with username "chet" is added, removed, or changes, this function reruns and adds all the messages to the messagesStream. Then I cause the messagesStream to render the messages accordingly. Now, the c returned from Tracker has a c.stop() to cleanup. So it would be good to have something like messageStream.onStop(function() { c.stop() }) for cleanup. To integrate Meteor with Flyd, I'd have something like this:

autorunStream = function(f) {
  s = flyd.stream()
  c = Tracker.autorun(R.compose(s, f))
  s.onStop(c.stop)
}

(P.S. I'm a huge Meteor fan if you couldn't tell)

So, back to the point. switchOnNext. Suppose I want to search for messages:

searchStream = floyd.stream('')

searchStream.map(function(query){
  autorunStream(function() {
    Messages.find({text:query}).fetch()
  })
}).map(R.map(renderMessages))

Now the problem with the following code is that the map returns a stream of streams. We could obviously flatten which would make sense.

searchStream.fmap(function(query){
  autorunStream(function() {
    Messages.find({text:query}).fetch()
  })
}).map(renderMessages)

But when a new search query comes in, we create a new stream and the last query autorunStream doesnt get cleaned up! So we can do something like this:

searchStream.fmap(function(query){
  autorunStream(function() {
    Messages.find({text:query}).fetch()
  }).takeUntil(searchStream)
}).map(renderMessages)

But now this is looking ugly. They way I had to do it with Highland and you'd have to do with RxJS is

searchStream.map(function(query){
  autorunStream(function() {
    Messages.find({text:query}).fetch()
  })
}).switchOnNext().map(renderMessages)

That looks alright. But I think the syntax would be nicer like this:

searchStream.switchMap(function(query){
  autorunStream(function() {
    Messages.find({text:query}).fetch()
  })
}).map(renderMessages)

Here, switchMap is basically fmap with takeUntil on the returned streams. Just like .map(f).switchOnNext()

Anyways, I hope that provides some inspiration for this function.

@paldepind
Copy link
Owner

Now, the c returned from Tracker has a c.stop() to cleanup. So it would be good to have something like messageStream.onStop(function() { c.stop() }) for cleanup. To integrate Meteor with Flyd, I'd have something like this

This is not documented yet but stream in Flyd has an end stream attached that emit a value when the stream ends. So you could do something like this:

flyd.map(function() {
  c.stop();
}, messageStream.end);

Thanks for the write up. I'm not sure I understand all of your examples though. Flyd should definitely have switchMap as well.

I'm currently considering what to name the different functions. I'm not sure I like switch or switchOnNext. switchMap however is also called flatMapLatest (both by Rx and Bacon). The name flatMapLatest makes a lot of sense to me and it's consistent since Flyd has flyd-flatmap. But then shouldn't switchOnNext just be named latest? It seems descriptive and quite consistent since latest(map(fn, stream)) is the same as flatMapLatest(fn, stream)?

@ccorcos
Copy link
Contributor Author

ccorcos commented Apr 25, 2015

In other libraries, streams are more like events that are consumed. Thus, latest typically just grabs the latest value from the stream. In flyd, you can just call the stream again.

As for naming, I think switch helps to imply that the previous streams are ended. In a way, we are creating a new stream that switches from stream to stream.

------
   xxxxx
     *********
---xx*********

On Apr 25, 2015, at 03:40, Simon Friis Vindum notifications@github.com wrote:

Now, the c returned from Tracker has a c.stop() to cleanup. So it would be good to have something like messageStream.onStop(function() { c.stop() }) for cleanup. To integrate Meteor with Flyd, I'd have something like this

This is not documented yet but stream in Flyd has an end stream attached that emit a value when the stream ends. So you could do something like this:

flyd.map(function() {
c.stop();
}, messageStream.end);
Thanks for the write up. I'm not sure I understand all of your examples though. Flyd should definitely have switchMap as well.

I'm currently considering what to name the different functions. I'm not sure I like switch or switchOnNext. switchMap however is also called flatMapLatest (both by Rx and Bacon). The name flatMapLatest makes a lot of sense to me and it's consistent since Flyd has flyd-flatmap. But then shouldn't switchOnNext just be named latest? It seems descriptive and quite consistent since latest(map(fn, stream)) is the same as flatMapLatest(fn, stream)?


Reply to this email directly or view it on GitHub.

@paldepind
Copy link
Owner

Yes, I see what you mean. That is a good point. latest would be confusing.

I just had another idea. What about calling switch flatLatest. That seems pretty coherent:

flatMap - Map function over a stream and flatten the result into one stream.
flatMapLatest - Map function over a stream and only emit from the latest stream.
flatLatest - Flatten stream and only emit from the latest stream.

I'm not opposed to switch though but I think that name looses the relationship to flatMap and flatMapLatest.

@ccorcos
Copy link
Contributor Author

ccorcos commented Apr 28, 2015

Ok. I had to think on this for a while.

Suppose f returns a stream. Then what are the equivalences or the nuanced differences between he following

a = flyd.stream()

// 1)
b = map(f, a)
c1 = flatten(b)
// versus
c2 = flatmap(f,a)

// 2)
b = map(f,a)
c1 = flatLatest(b)
// versus
c2 = flatMapLatest(f, a)

In 1), does c1 end when a ends? Do all the streams produced by f end when a ends? If so, then c1 and c2 are the same right?

In 2), I believe c1 and c2 are equivalent as well. But I think its also an interested case for thinking about the terminology. flatmap = compose(flat, map). That looks right. flatMapLatest = compose(flatLatest, map). Thats odd because map gets wedged in between. Its also off because flatLatest = compose(flatten, latest) does mean anything. What is latest without flatten?

So here's my proposal. switch is very similar to flatten. flatten combines all streams into one, while switch switches the stream to the most recent one. They're similar in that there is some concept of flattening going on here. But switch will end the previous stream and emit from the latest stream when it comes about.

Thus flatmap = compose(flatten, map) and switchMap = compose(switch, map). I see what you're saying that the name "switch" doesnt allude to flatten at all. But the reality it that, although they are similar, they are not compositions of each other...

P.S. How would you actually write flatmap = compose(flatten, map)? This is obviously incorrect, but its conceptually there. The first argument is a function that partially applies to map, then the second argument goes through the rest. I suppose we could write:

function flatMap(f,x) {
  return compose(flatten, map(f))(x)
}

But is there a more formal way of doing this?

@paldepind
Copy link
Owner

In 1), does c1 end when a ends?

Yes. c1 depends on b which depends on a. So when a ends c1 ends.

Do all the streams produced by f end when a ends?

I don't think so. f creates streams based on values from a. f decides the end condition for these streams.

I agree that c1 and c2 are equivalent in both 1) and 2).

In 2), I believe c1 and c2 are equivalent as well. But I think its also an interested case for thinking about the terminology. flatmap = compose(flat, map). That looks right. flatMapLatest = compose(flatLatest, map). Thats odd because map gets wedged in between. Its also off because flatLatest = compose(flatten, latest) does mean anything. What is latest without flatten?

That is a very good point. You're right that the naming in that case doesn't properly reflect which functions the functions a composed of. flatLatest isn't composed of flat and latest – it's a function on it's own, even though the name could indicate otherwise.

So here's my proposal. switch is very similar to flatten. flatten combines all streams into one, while switch switches the stream to the most recent one. They're similar in that there is some concept of flattening going on here. But switch will end the previous stream and emit from the latest stream when it comes about.

Thus flatmap = compose(flatten, map) and switchMap = compose(switch, map). I see what you're saying that the name "switch" doesnt allude to flatten at all. But the reality it that, although they are similar, they are not compositions of each other...

You've convinced me. You make some splendid arguments. I agree that flatMapLatest and flatLatest are misguided names and that including flat is actually wrong.

So instead of what I previously suggested we'd instead have this:

flatMap - Map function over a stream and flatten the result into one stream.
switch - Take a stream of streams and only emit from the latest stream.
switchMap - Map function over a stream and switch on it. I.e. it's conceptually equal to compose(switch, map).

But switch will end the previous stream

That is not my understanding of switch based on the Rx documentation. It only unsubscribes from the previously emitted stream. It doesn't end it.

How would you actually write flatmap = compose(flatten, map)? This is obviously incorrect, but its conceptually there.

How is that incorrect? With a proper composed and flatten you could write it like that. Consider your code:

function flatMap(f,x) {
  return compose(flatten, map(f))(x)
}

That is equivalent to:

function flatMap(f,x) {
  return compose(flatten, map)(f, x)
}

And then we can make it point free:

var flatMap = compose(flatten, map);

@ccorcos
Copy link
Contributor Author

ccorcos commented Apr 29, 2015

Do all the streams produced by f end when a ends?

I don't think so. f creates streams based on values from a. f decides the end condition for these streams.

Hmm. I think this will likely be a source of memory leaks for people. I'm trying to think of a case when you'd use flatten but wouldn't want it to end the underlying streams when the original stream ends...

But switch will end the previous stream

That is not my understanding of switch based on the Rx documentation. It only unsubscribes from the previously emitted stream. It doesn't end it.

Same thing here... Conceptually, I could see how you might use flatten and switch without ending the inner streams when the outer stream ends, but I don't see that as a common use. At least for me if I were to flatMap(f, x) or switchMap(f, x), then f is creating a stream that typically depends on x. If x ends, then I have some streams in limbo that haven't been ended.

I believe you're correct though... I see no mention of ending the inner streams in the RxJS documentation. In switch it seems that they are explicitly not ending the inner stream when the outer stream ends according to the diagram.

Well that said, maybe we need something to handle that as well? Something like endOnUnsubscribe or something like that which can handle both the flatten and switch situations.

flatMap - Map function over a stream and flatten the result into one stream.
switch - Take a stream of streams and only emit from the latest stream.
switchMap - Map function over a stream and switch on it. I.e. it's conceptually equal to compose(switch, map).

I like it 👍

var flatMap = compose(flatten, map);

So this is what I'm a little confused about. I guess compose looks at the function.length and curries appropriately? It sees that map takes 2 arguments, then passed the result of map into flatten... that makes sense. So I could see this working so long as only the right-most function has more than one argument. What if we compose multiple functions with multiple arguments? How would this work?

var doubleMap = compose(map, map);

doubleMap(f1, s, f2) = compose(map(f2), map(f1))(s) // ???
// or 
doubleMap(f1, f2) =  compose(map(f2), map(f1)) // ?

@paldepind
Copy link
Owner

Hmm. I think this will likely be a source of memory leaks for people. I'm trying to think of a case when you'd use flatten but wouldn't want it to end the underlying streams when the original stream ends...

This is an example from "The introduction to Reactive Programming you've been missing":

var requestStream = Rx.Observable.just('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // render `response` to the DOM however you wish
});

Here the streams/observables created in the flatMap emit one value and then ends.

At least for me if I were to flatMap(f, x) or switchMap(f, x), then f is creating a stream that typically depends on x. If x ends, then I have some streams in limbo that haven't been ended.

I don't think f should create streams that depends on x when x isn't being passed to f (only the values emitted by x are passed to f). That being said, if that is actually what you're doing then the streams created by f will actually end when f ends since streams in Flyd by default ends when their dependencies ends. I think you'd have a hard time creating leaks in Flyd unless you use flyd.endsOn in some odd ways (but then it's certainly your own fault).

There is only one, albeit huge, problem left. People would want to import switch like this var switch = require('flyd-switch') but that is impossible since switch is a reserved word in JS. So, maybe it should be called latest after all or flatLatest together with flatLatestMap.

So this is what I'm a little confused about. I guess compose looks at the function.length and curries appropriately?

It could do. The compose in Ramda doesn't. It returns an uncurried function.

It sees that map takes 2 arguments, then passed the result of map into flatten... that makes sense. So I could see this working so long as only the right-most function has more than one argument. What if we compose multiple functions with multiple arguments? How would this work?

Yes, that is exactly how it is. Only the right-most function can take more than one argument. Ramda has useWith and converg. They can be used as alternatives in some cases. But otherwise you'd have to give up on point free and just create a "normal" function.

@ccorcos
Copy link
Contributor Author

ccorcos commented Apr 29, 2015

This is an example from "The introduction to Reactive Programming you've been missing":

The problem with this example is that the stream ends itself after it emits once. Suppose the internet is slow and you want to cancel the request or something like that. You may want to end the request stream. The response stream will be ended, but the underlying streams will be orphaned and cannot be ended. Try to imagine a case that doesnt end itself...

Here's an example. Suppose we have a stream of chatroom names. When we get a new chatroom, we want to clear the messages rendered on the screen. Then we want to subscribe to messages from that chatroom -- creating a stream and using switchMap -- then we want to render all those message on screen.

chatroom = flyd.stream('flyd');
compose(map(renderMessages), switchMap(createMessagesStream), map(clearMessages))(chatroom)

If we close the chatroom window and end the chatroom stream, then the individual message streams for each chatroom visited will be orphaned and not ended! This doesn't seem like a good functionality...

I don't think f should create streams that depends on x when x isn't being passed to f (only the values emitted by x are passed to f).

I meant to say that f depend on the values emitted from x. Just as the messagesStream depends on which chatroom.

People would want to import switch like this var switch = require('flyd-switch')

Hmm. Well wouldn't it be namespaced by flyd.switch when you call var flyd = require('flyd')?

@paldepind
Copy link
Owner

If createMessageStream does something like this, then the problem is solved:

function(roomName) {
  return flyd.endsOn(chatroom, doCreateMessageStream(roomName));
}

Now the streams created by createMessageStream ends whenever a new room name appears.

But I can see what you mean. This case might be common enough that having a version of switchMap that does end the previous stream would be in order. So the function would not have to worry about setting up the end stream properly. But then that function needs a name as well :)

Hmm. Well wouldn't it be namespaced by flyd.switch when you call var flyd = require('flyd')?

It would be a module, and people can import then how they want. I've often imported them directly for convenience. You could do flyd.switch = require('flyd-switch') but I don't like it since the other modules leaves you free to decide.

What about naming it switchLast or switchLatest? It's slightly longer but in my opinion also more descriptive.

@ccorcos
Copy link
Contributor Author

ccorcos commented May 1, 2015

If switch is going to end streams when it moves on to the next, then its almost just "chaining" streams together. Maybe "stitch", "weave", "chain", "daisyChain", or something like that? I'm a big fan of single word names for these most basic functions though.

I think it would be a good general practice that if a stream is created within another stream, then it ends when the outer stream ends...

@paldepind
Copy link
Owner

If switch is going to end streams when it moves on to the next

Consider this example. You have a stream that emits a stream of messages from current visible chatroom. I.e. whenever the user switches between a chat room tab the stream changes.

var currentChatroomMessages = /* something */

You render these messages like in your last example. But the message streams in currentChatroomMessages are also used elsewhere to show an unread count next to the tabs for the non-focused chat rooms. Does that makes sense? In that case you do not want switch to end the last streams.

The switch that do not ends stream is the most low level version – it gives you more control. So I consider that function to be the most important one. But I see that the other variant is useful as well so we should have that too.

I'm a big fan of single word names for these most basic functions though.

Then the only thing I can think of is latest. It can be misunderstood as you pointed out earlier but it might not be too bad. I do prefer switchLatest though.

@davidchase
Copy link

+1 for switchLatest thats what i thought of immediately when seeing the switch or switchOnNext.. i think it makes sense when dealing with streams of streams calling switchLatest will return a new stream with the behavior of the most recent one

@ccorcos
Copy link
Contributor Author

ccorcos commented May 4, 2015

Ok. So there are two difference scenarios here.

  1. You create a bunch of streams. Then you have a switchLatest between the streams, but you also use those streams elsewhere.
a = flyd.stream()
b = flyd.stream()
c = flyd.stream()
streams = [a,b,c]
which = flyd.stream(0)
current = switchLatest(function(i) {
  return streams[i]
}, which)
all = lift(args2array, a,b,c)
// do whatever you want because a, b, and c are never ended

This makes sense not to end those streams, and yes, this is a more general case.

  1. You create a stream within a stream. That stream is then orphaned and will never be ended, thus creating a memory leak. Lets assume the message stream will not be used elsewhere, thus the intention is to create a stream of data from the server and end it when we go to another chatroom.
chatrooms = flyd.stream('1')
messages = switchLatest(function(id) {
  return createMessageStream(id)
})

createMessageStream will create a stream. Sure we could use takeUntil to end it when messages ends, but my point here is that if you create a stream within a stream without calling takeUntil, it will be orphaned and you'll have a memory leak.

So why can't we safeguard against that? Here's an interesting idea -- check to see if you are creating a stream within a stream and call takeUntil if you are...

Whenever you are running the function of a stream (not sure the lingo here, but by that I mean, the second argument of the stream function), then keep track of the current stream that is running. Something like this happens:

prevStream = flyd.currentStream
flyd.currentStream = s
s.f()
flyd.currentStream = prevStream

Then whenever we get a constructor for a stream, we check to see if there is a current stream (checking to see if we created a stream within a stream), and make sure it ends when its parent stream ends.

if (flyd.currentStream) {
   takeUntil(flyd.currentStream, this)
}

This may seem like overkill. But on the other hand, it seems like a reasonable functionality to me. It actually reminds me of how Tracker works...

@ccorcos
Copy link
Contributor Author

ccorcos commented May 4, 2015

switchLatest is fine by me if thats what you guys want. Makes sense to me.

@paldepind
Copy link
Owner

@davidchase Thanks for jumping in! switchLatest it is. I like switchLatest it is more descriptive without being cumbersome and should still be recognizable to people from Rx. I'll probably get the module up tommorow :)

@ccorcos I can totally see where your coming from and what you suggests makes sense. But, in your example

chatrooms = flyd.stream('1')
messages = switchLatest(function(id) {
  return createMessageStream(id)
})

No one but switchLatest will subscribe to the streams created by createMessageStream. Thus, whenever switchLatest unsubscribes no one will have reference to it and it will be garbage collected. An orphaned object is exactly not a memory leak – it is an object ready for garbage collection.

@ccorcos
Copy link
Contributor Author

ccorcos commented May 5, 2015

Hmm. Good point... But if a stream is garbage collected, then its not necessarily ended... It seems like that could have some undesireable effects somehow. I see your point, just makes me feel a little uncomfortable.

@paldepind
Copy link
Owner

I think of it this way: I stream can either end, in which case it signals to all it's dependents that it's done. Or, it can become unnecessary in which case the garbage collector will get rid of it. It's two different things. I don't think a stream needs to end unless it actually really ends.

@paldepind
Copy link
Owner

flyd-switchlatest is here.

@ccorcos
Copy link
Contributor Author

ccorcos commented May 6, 2015

I see. Sweet! Gotta love how simple it is in the end...

@ccorcos ccorcos closed this as completed May 6, 2015
@davidchase
Copy link

👍

@ccorcos
Copy link
Contributor Author

ccorcos commented May 8, 2015

I just realized the problem with not ending the stream and letting garbage collection take care of it... In createMessageStream, you are likely creating a subscription to the server that ends when the stream ends (remember the stuff with autorun?). If we don't end the stream and let the garbage collector cleanup the stream, then we are left with a subscription to the server that is never stopped...

@ccorcos ccorcos reopened this May 8, 2015
@paldepind
Copy link
Owner

@ccorcos

Sorry for the late reply :( That is a very valid problem. You could set up the end conditions on the created stream so that it ends when the source emits a new value.

But the better solution would probably be for Flyd to introduce laziness. Laziness has disadvantages though so this is something I'll have to think about.

@ccorcos
Copy link
Contributor Author

ccorcos commented May 13, 2015

You could set up the end conditions on the created stream so that it ends when the source emits a new value.

I'm going to end up doing this everywhere:

chatrooms = flyd.stream('1')
messages = switchLatest(function(id) {
  return createMessageStream(id).takeUntil(chatrooms)
}, chatrooms)

But the better solution would probably be for Flyd to introduce laziness. Laziness has disadvantages though so this is something I'll have to think about.

Hmm. Not sure I follow how laziness is relevant here... Also, I think introducing laziness would ruin the KISS aspect of Flyd.

@paldepind
Copy link
Owner

Well, creating a version of switchLatest that does end the previous stream would be easy to make. But what should it be called?

Hmm. Not sure I follow how laziness is relevant here... Also, I think introducing laziness would ruin the KISS aspect of Flyd.

With lazyness a stream would only refer to its dependencies when it has active subscribers. So when switchLatest unsubscribes from a stream it would become inactive, detach itself from its dependency and then automatically be available for garbage collection.

Implementing laziness would be quite easy. But it definitely has some negatives impacts on user friendliness.

@paldepind paldepind mentioned this issue May 13, 2015
@ccorcos
Copy link
Contributor Author

ccorcos commented May 13, 2015

Hmm. This sounds confusing. I would consider the example I explained earlier where the constructor checks to see if the stream is being created inside another stream and will endOn the parent accordingly...

// when mapping over a stream
prevStream = flyd.currentStream
flyd.currentStream = s
s.f()
flyd.currentStream = prevStream

// in the constructor
if (flyd.currentStream) {
   takeUntil(flyd.currentStream, this)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants