-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Navigating the stream graph upstream #85
Comments
The reason this doesn't already exist is because it would introduce memory If you can give me more detail about your use case, though, I may be able
|
Actually, I wrote the following thing as a crutch: (defn- unwrap [stream]
(loop [s stream]
(cond (instance? manifold.stream.Callback s)
(recur (first (.downstream s)))
(instance? manifold.stream.SourceProxy s)
(recur (.source s))
(instance? manifold.stream.SinkProxy s)
(recur (.sink s))
:else s)))
(defn upstream [sink]
(let [sink (unwrap sink)
is-sink? (fn [^manifold.stream.graph.Downstream d]
(= (unwrap (.sink d)) sink))
source #(when (some is-sink? (val %))
(when-let [k (.get (key %))]
[k]))]
(->> (.entrySet manifold.stream.graph/handle->downstreams)
(mapcat source)))) which serves our purpose. This will produce inconsistent results if the streams are connected/disconnected concurrently but we have our topology finalized by the time we need this. EDIT: some unwrapping needed to make the EDIT2: relying on the graph being preserved through all the transformations is error-prone. Consider |
Would it make sense to add an
manifold.stream/upstream
analogous to already existingmanifold.stream/downstream
? I don't think it would be needed for any functionality currently, but would be great for graph navigation purposes.In our use-case we need to reconstruct stream topology starting from the sink. It seems limiting being able to get the graph from the source but not from the sink.
Would you accept such a PR?
The text was updated successfully, but these errors were encountered: