Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ gradle.properties
.classpath
.project
.settings/
.idea
**/*.iml
84 changes: 82 additions & 2 deletions reactfx/src/main/java/org/reactfx/Subscription.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package org.reactfx;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import javafx.collections.ObservableList;
import javafx.collections.ObservableSet;

@FunctionalInterface
Expand Down Expand Up @@ -45,7 +49,7 @@ static <T> Subscription multi(
Function<? super T, ? extends Subscription> f,
T... elems) {
return multi(Stream.of(elems).map(f)
.<Subscription>toArray(n -> new Subscription[n]));
.<Subscription>toArray(Subscription[]::new));
}

/**
Expand All @@ -57,7 +61,7 @@ static <T> Subscription multi(
Function<? super T, ? extends Subscription> f,
Collection<T> elems) {
return multi(elems.stream().map(f)
.<Subscription>toArray(n -> new Subscription[n]));
.<Subscription>toArray(Subscription[]::new));
}

/**
Expand Down Expand Up @@ -97,6 +101,82 @@ static <T> Subscription dynamic(
elemSubs.forEach((t, sub) -> sub.unsubscribe());
};
}


/**
* Dynamically subscribes to all elements of the given observable list.
* When an element is added to the list, it is automatically subscribed to.
* When an element is removed from the list, it is automatically unsubscribed
* from.
*
* @param elems Observable list of elements that will be subscribed to
* @param f Function to subscribe to an element of the list. The first parameter
* is the element, the second is its index in the new source list
* @param <T> Type of elements
*
* @return An aggregate subscription that tracks elementary subscriptions.
* When the returned subscription is unsubscribed, all elementary
* subscriptions are unsubscribed as well, and no new elementary
* subscriptions will be created.
*/
static <T> Subscription dynamic(ObservableList<? extends T> elems,
BiFunction<? super T, Integer, ? extends Subscription> f) {

List<Subscription> elemSubs = new ArrayList<>(elems.size());

for (int i = 0; i < elems.size(); i++) {
elemSubs.add(f.apply(elems.get(i), i));
}

Subscription lstSub = EventStreams.changesOf(elems).subscribe(ch -> {
while (ch.next()) {
if (ch.wasPermutated()) {
Subscription left = elemSubs.get(ch.getFrom());
Subscription right = elemSubs.set(ch.getTo(), left);
elemSubs.set(ch.getFrom(), right);
} else {
if (ch.wasRemoved()) {
// oldList[from : from + removed.size] === removed
int from = ch.getFrom();
for (T ignored : ch.getRemoved()) {
elemSubs.remove(from).unsubscribe();
}
}
if (ch.wasAdded()) {
// newList[from : to] === addedSubList
int i = ch.getFrom();
for (T added : ch.getAddedSubList()) {
elemSubs.add(i, f.apply(added, i));
i++;
}
}
}
}
});

return () -> {
lstSub.unsubscribe();
elemSubs.forEach(Subscription::unsubscribe);
};
}


/**
* An overload of {@link #dynamic(ObservableList, BiFunction)} that can be used when the
* subscribe function does not use the index of the element in the list.
*
* @param elems Observable list of elements that will be subscribed to
* @param f Function to subscribe to an element of the list
* @param <T> Type of elements
*
* @return An aggregate subscription that tracks elementary subscriptions.
* When the returned subscription is unsubscribed, all elementary
* subscriptions are unsubscribed as well, and no new elementary
* subscriptions will be created.
*/
static <T> Subscription dynamic(ObservableList<? extends T> elems, Function<? super T, ? extends Subscription> f) {
return dynamic(elems, (e, i) -> f.apply(e));
}
}

class BiSubscription implements Subscription {
Expand Down
65 changes: 65 additions & 0 deletions reactfx/src/main/java/org/reactfx/collection/FlatValList.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.reactfx.collection;

import static java.util.Collections.singletonList;

import org.reactfx.Subscription;
import org.reactfx.value.Val;

import javafx.beans.value.ObservableValue;
import javafx.collections.ObservableList;


/**
* Wrapper around an ObservableList&lt;ObservableValue&lt;T&gt;&gt; that presents
* the interface of a LiveList&lt;T&gt; and subscribes to the changes of its individual
* elements. See {@link LiveList#flattenVals(ObservableList)}.
*
* @author Clément Fournier
*/
class FlatValList<T> extends LiveListBase<T> implements UnmodifiableByDefaultLiveList<T> {

private final ObservableList<? extends ObservableValue<? extends T>> mySource;
private final LiveList<T> mapped;


FlatValList(ObservableList<? extends ObservableValue<? extends T>> source) {
this.mySource = source;
this.mapped = LiveList.map(source, ObservableValue::getValue);
}


@Override
public int size() {
return mySource.size();
}


@Override
public T get(int index) {
return mySource.get(index).getValue();
}


@Override
protected Subscription observeInputs() {

return Subscription.multi(
LiveList.observeQuasiChanges(mapped, this::notifyObservers),
Subscription.dynamic(mySource,
(element, i) -> Val.observeChanges(element,
(obs, oldV, newV) -> componentChanged(i, oldV, newV)))
);
}


private void componentChanged(int idx, T oldV, T newV) {
notifyObservers(componentChange(idx, oldV, newV));
}


private static <T> QuasiListChange<T> componentChange(int sourceIdx, T oldV, T newV) {
return () -> singletonList(
QuasiListModification.create(sourceIdx, singletonList(oldV), 1)
);
}
}
73 changes: 67 additions & 6 deletions reactfx/src/main/java/org/reactfx/collection/LiveList.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;

import javafx.beans.InvalidationListener;
import javafx.beans.value.ObservableValue;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import javafx.scene.control.IndexRange;

import org.reactfx.EventStream;
import org.reactfx.EventStreamBase;
import org.reactfx.Observable;
Expand All @@ -24,6 +19,12 @@
import org.reactfx.util.WrapperBase;
import org.reactfx.value.Val;

import javafx.beans.InvalidationListener;
import javafx.beans.value.ObservableValue;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import javafx.scene.control.IndexRange;

/**
* Adds additional methods to {@link ObservableList}.
*
Expand Down Expand Up @@ -175,6 +176,23 @@ default Val<Integer> sizeProperty() {
return sizeOf(this);
}


/**
* Creates a new LiveList that reflects the values of the elements of the
* given list of observables. See {@link #flattenVals(ObservableList)}.
*
* <p>The returned list is unmodifiable but can be observed.
*
* @param f Maps the elements of this list to observable values
* @param <F> Type of the returned list
*
* @return A new LiveList
*/
default <F> LiveList<F> flattenVals(Function<? super E, ? extends ObservableValue<? extends F>> f) {
return flattenVals(this, f);
}


default <F> LiveList<F> map(Function<? super E, ? extends F> f) {
return map(this, f);
}
Expand Down Expand Up @@ -357,6 +375,49 @@ static <E, T> Val<T> collapseDynamic(
static <E> LiveList<E> wrapVal(ObservableValue<E> obs) {
return new ValAsList<>(obs);
}


/**
* Creates a new LiveList that reflects the values of the elements of the
* given list of observables. If any of the observable values contained in
* the source list change, a list change is pushed (lazily). Additions or
* removals made to the source collection are also reflected by the returned
* list. It kind of behaves like {@code source.map(ObservableValue::getValue)},
* except the list is also updated when the individual elements change.
*
* <p>The returned list is unmodifiable but can be observed.
*
* @param source List of observables
* @param <E> Type of values of the returned list
*
* @return A new live list
*
* @throws NullPointerException If the source collection is null
*/
static <E> LiveList<E> flattenVals(ObservableList<? extends ObservableValue<? extends E>> source) {
return new FlatValList<>(Objects.requireNonNull(source));
}


/**
* Creates a new LiveList on top of a list of observables that reflects the values
* of the elements of the source list.
*
* <p>The returned list is unmodifiable but can be observed.
*
* @param source Observable list
* @param f Maps the elements of the source list to observable values
* @param <E> Type of values of the input list
* @param <F> Type of values of the returned list
*
* @return A new live list
*
* @throws NullPointerException If the source collection is null
*/
static <E, F> LiveList<F> flattenVals(ObservableList<? extends E> source,
Function<? super E, ? extends ObservableValue<? extends F>> f) {
return flattenVals(map(source, f));
}
}


Expand Down
Loading