Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CellIterator: add "lastContributionTimestamp" to OSMEntitySnapshot & decouple from Grid implementation #495

Merged
merged 10 commits into from
Apr 6, 2023
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
Changelog
=========

## 1.2.0-SNAPSHOT (current master)

### new features

* `OSMEntitySnapshot` now also returns the `lastContributionTimestamp` for each snapshot ([#495])

### other changes

* `CellIterator` is now decoupled from implementation of the "Grid" ([#495])

[#495]: https://github.com/GIScience/oshdb/pull/495


## 1.1.1

* update ignite dependency to [2.14.0-heigit1] ([#491])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
Expand All @@ -40,6 +39,7 @@
import org.heigit.ohsome.oshdb.util.CellId;
import org.heigit.ohsome.oshdb.util.TableNames;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.exceptions.OSHDBTimeoutException;
import org.heigit.ohsome.oshdb.util.function.SerializableBiFunction;
import org.heigit.ohsome.oshdb.util.function.SerializableBinaryOperator;
Expand Down Expand Up @@ -139,8 +139,8 @@ private static <T> T asyncGetHandleTimeouts(IgniteFuture<T> async, Long timeout)
// When a timeout happens remotely, the exception might be burried in (few) layers of
// "ignite exceptions". This recursively unwinds these and throws the original exception.
Throwable unwrapped = unwrapNestedIgniteException(e);
if (unwrapped instanceof OSHDBTimeoutException) {
throw (OSHDBTimeoutException) unwrapped;
if (unwrapped instanceof OSHDBTimeoutException timeoutException) {
throw timeoutException;
} else {
throw e;
}
Expand All @@ -150,8 +150,8 @@ private static <T> T asyncGetHandleTimeouts(IgniteFuture<T> async, Long timeout)
/** Recursively unwinds nested ignite exceptions. */
private static Throwable unwrapNestedIgniteException(IgniteException e) {
Throwable cause = e.getCause();
if (cause instanceof IgniteException && e != cause) {
return unwrapNestedIgniteException((IgniteException) cause);
if (cause instanceof IgniteException igniteException && e != cause) {
return unwrapNestedIgniteException(igniteException);
}
return cause;
}
Expand Down Expand Up @@ -193,13 +193,13 @@ private <S> S reduce(
.mapToObj(cellLongId -> asyncGetHandleTimeouts(
compute.affinityCallAsync(cacheName, cellLongId, () -> {
@SuppressWarnings("SerializableStoresNonSerializable")
GridOSHEntity oshEntityCell = cache.localPeek(cellLongId);
GridOSHEntity cell = cache.localPeek(cellLongId);
S ret;
if (oshEntityCell == null) {
if (cell == null) {
ret = identitySupplier.get();

} else {
ret = cellProcessor.apply(oshEntityCell, cellIterator);
ret = cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator);
}
onClose.run();
return ret;
Expand Down Expand Up @@ -263,19 +263,19 @@ cacheName, cellIdRangeToCellIds(), cellIdRanges, cellProcessor, cellIterator
this.timeout
).stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
.toList();
Collections.shuffle(cellsWithData);
Stream<X> resultForType = cellsWithData.parallelStream()
.filter(ignored -> this.isActive())
.map(cellLongId -> asyncGetHandleTimeouts(
compute.affinityCallAsync(cacheName, cellLongId, () -> {
GridOSHEntity oshEntityCell = cache.localPeek(cellLongId);
GridOSHEntity cell = cache.localPeek(cellLongId);
Collection<X> ret;
if (oshEntityCell == null) {
if (cell == null) {
ret = Collections.<X>emptyList();
} else {
ret = cellProcessor.apply(oshEntityCell, cellIterator)
.collect(Collectors.toList());
ret = cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator)
.toList();
}
onClose.run();
return ret;
Expand Down Expand Up @@ -444,10 +444,11 @@ public Collection<Long> call() {
// test if cell exists and contains any relevant data
GridOSHEntity cell = localCache.localPeek(cellLongId);
return cell != null
&& cellProcessor.apply(cell, cellIterator).anyMatch(ignored -> true);
&& cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator)
.anyMatch(ignored -> true);
})
.boxed()
.collect(Collectors.toList());
.toList();
}
}

Expand Down Expand Up @@ -488,13 +489,14 @@ public Collection<Long> call() {
MapReducerIgniteScanQuery.cellKeyInRange(key, cellIdRangesByLevel)
).setPartition(part), cacheEntry -> {
Object data = cacheEntry.getValue();
GridOSHEntity oshEntityCell;
if (data instanceof BinaryObject) {
oshEntityCell = ((BinaryObject) data).deserialize();
GridOSHEntity cell;
if (data instanceof BinaryObject binaryData) {
cell = binaryData.deserialize();
} else {
oshEntityCell = (GridOSHEntity) data;
cell = (GridOSHEntity) data;
}
Stream<?> cellStream = cellProcessor.apply(oshEntityCell, this.cellIterator);
Stream<?> cellStream = cellProcessor.apply(
OSHEntitySource.fromGridOSHEntity(cell), this.cellIterator);
if (cellStream.anyMatch(ignored -> true)) {
return Optional.of(cacheEntry.getKey());
} else {
Expand All @@ -510,7 +512,7 @@ public Collection<Long> call() {
}
})
.flatMap(Collection::stream)
.collect(Collectors.toList());
.toList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.heigit.ohsome.oshdb.util.CellId;
import org.heigit.ohsome.oshdb.util.TableNames;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.exceptions.OSHDBTimeoutException;
import org.heigit.ohsome.oshdb.util.function.OSHEntityFilter;
import org.heigit.ohsome.oshdb.util.function.OSMEntityFilter;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected Stream<X> flatMapStreamCellsOSMEntitySnapshotGroupedById(

private List<String> cacheNames(String prefix) {
return this.typeFilter.stream().map(TableNames::forOSMType).filter(Optional::isPresent)
.map(Optional::get).map(tn -> tn.toString(prefix)).collect(Collectors.toList());
.map(Optional::get).map(tn -> tn.toString(prefix)).toList();
}

@Override
Expand Down Expand Up @@ -281,7 +282,8 @@ S execute(Ignite node, CellProcessor<S> cellProcessor) {
// filter out cache misses === empty oshdb cells or not "local" data
.filter(Objects::nonNull)
.filter(ignored -> this.isActive())
.map(cell -> cellProcessor.apply(cell, this.cellIterator))
.map(cell ->
cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), this.cellIterator))
.reduce(identitySupplier.get(), combiner);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
Expand All @@ -39,6 +38,7 @@
import org.heigit.ohsome.oshdb.util.CellId;
import org.heigit.ohsome.oshdb.util.TableNames;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.exceptions.OSHDBTimeoutException;
import org.heigit.ohsome.oshdb.util.function.OSHEntityFilter;
import org.heigit.ohsome.oshdb.util.function.OSMEntityFilter;
Expand Down Expand Up @@ -342,13 +342,14 @@ S execute(Ignite node, CellProcessor<S> cellProcessor) {
}
// iterate over the history of all OSM objects in the current cell
Object data = cacheEntry.getValue();
GridOSHEntity oshEntityCell;
if (data instanceof BinaryObject) {
oshEntityCell = ((BinaryObject) data).deserialize();
GridOSHEntity cell;
if (data instanceof BinaryObject binaryData) {
cell = binaryData.deserialize();
} else {
oshEntityCell = (GridOSHEntity) data;
cell = (GridOSHEntity) data;
}
return cellProcessor.apply(oshEntityCell, this.cellIterator);
return cellProcessor.apply(
OSHEntitySource.fromGridOSHEntity(cell), this.cellIterator);
}
)
) {
Expand Down Expand Up @@ -503,7 +504,7 @@ private static <V, R, M, S, P extends Geometry & Polygonal> S mapReduceOnIgniteC
null
);
S ret;
if (!oshdb.timeoutInMilliseconds().isPresent()) {
if (oshdb.timeoutInMilliseconds().isEmpty()) {
ret = result.get();
} else {
try {
Expand Down Expand Up @@ -534,13 +535,14 @@ private static <X> Stream<X> mapStreamOnIgniteCache(
).setPageSize(SCAN_QUERY_PAGE_SIZE), cacheEntry -> {
// iterate over the history of all OSM objects in the current cell
Object data = cacheEntry.getValue();
GridOSHEntity oshEntityCell;
if (data instanceof BinaryObject) {
oshEntityCell = ((BinaryObject) data).deserialize();
GridOSHEntity cell;
if (data instanceof BinaryObject binaryData) {
cell = binaryData.deserialize();
} else {
oshEntityCell = (GridOSHEntity) data;
cell = (GridOSHEntity) data;
}
return cellProcessor.apply(oshEntityCell, cellIterator).collect(Collectors.toList());
return cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator)
.toList();
}
);
// todo: ignite scan query doesn't support timeouts -> implement ourself?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
import javax.annotation.Nonnull;
import org.heigit.ohsome.oshdb.api.object.OSMContributionImpl;
import org.heigit.ohsome.oshdb.api.object.OSMEntitySnapshotImpl;
import org.heigit.ohsome.oshdb.grid.GridOSHEntity;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.function.SerializableBiFunction;
import org.heigit.ohsome.oshdb.util.function.SerializableFunction;
import org.heigit.ohsome.oshdb.util.function.SerializableSupplier;
import org.heigit.ohsome.oshdb.util.mappable.OSMContribution;
import org.heigit.ohsome.oshdb.util.mappable.OSMEntitySnapshot;

class Kernels implements Serializable {
interface CellProcessor<S> extends SerializableBiFunction<GridOSHEntity, CellIterator, S> {}
interface CellProcessor<S> extends SerializableBiFunction<OSHEntitySource, CellIterator, S> {}

interface CancelableProcessStatus {
default <T> boolean isActive(T ignored) {
Expand Down Expand Up @@ -57,10 +57,10 @@ static <R, S> CellProcessor<S> getOSMContributionCellReducer(
SerializableBiFunction<S, R, S> accumulator,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
cellIterator.iterateByContribution(oshEntityCell)
cellIterator.iterateByContribution(source)
.takeWhile(process::isActive)
.forEach(contribution -> {
OSMContribution osmContribution = new OSMContributionImpl(contribution);
Expand All @@ -86,11 +86,11 @@ static <R, S> CellProcessor<S> getOSMContributionGroupingCellReducer(
SerializableBiFunction<S, R, S> accumulator,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
// iterate over the history of all OSM objects in the current cell
List<OSMContribution> contributions = new ArrayList<>();
cellIterator.iterateByContribution(oshEntityCell)
cellIterator.iterateByContribution(source)
.takeWhile(process::isActive)
.forEach(contribution -> {
OSMContribution thisContribution = new OSMContributionImpl(contribution);
Expand Down Expand Up @@ -131,10 +131,10 @@ static <R, S> CellProcessor<S> getOSMEntitySnapshotCellReducer(
SerializableBiFunction<S, R, S> accumulator,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
cellIterator.iterateByTimestamps(oshEntityCell)
cellIterator.iterateByTimestamps(source)
.takeWhile(process::isActive)
.forEach(data -> {
OSMEntitySnapshot snapshot = new OSMEntitySnapshotImpl(data);
Expand All @@ -161,11 +161,11 @@ static <R, S> CellProcessor<S> getOSMEntitySnapshotGroupingCellReducer(
SerializableBiFunction<S, R, S> accumulator,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
List<OSMEntitySnapshot> osmEntitySnapshots = new ArrayList<>();
cellIterator.iterateByTimestamps(oshEntityCell)
cellIterator.iterateByTimestamps(source)
.takeWhile(process::isActive)
.forEach(data -> {
OSMEntitySnapshot thisSnapshot = new OSMEntitySnapshotImpl(data);
Expand Down Expand Up @@ -204,9 +204,9 @@ static <S> CellProcessor<Stream<S>> getOSMContributionCellStreamer(
SerializableFunction<OSMContribution, S> mapper,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
return cellIterator.iterateByContribution(oshEntityCell)
return cellIterator.iterateByContribution(source)
.takeWhile(process::isActive)
.map(OSMContributionImpl::new)
.map(mapper);
Expand All @@ -225,11 +225,11 @@ static <S> CellProcessor<Stream<S>> getOSMContributionGroupingCellStreamer(
SerializableFunction<List<OSMContribution>, Iterable<S>> mapper,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
List<OSMContribution> contributions = new ArrayList<>();
List<S> result = new LinkedList<>();
cellIterator.iterateByContribution(oshEntityCell)
cellIterator.iterateByContribution(source)
.takeWhile(process::isActive)
.map(OSMContributionImpl::new)
.forEach(contribution -> {
Expand Down Expand Up @@ -261,9 +261,9 @@ static <S> CellProcessor<Stream<S>> getOSMEntitySnapshotCellStreamer(
SerializableFunction<OSMEntitySnapshot, S> mapper,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
return cellIterator.iterateByTimestamps(oshEntityCell)
return cellIterator.iterateByTimestamps(source)
.takeWhile(process::isActive)
.map(OSMEntitySnapshotImpl::new)
.map(mapper);
Expand All @@ -282,11 +282,11 @@ static <S> CellProcessor<Stream<S>> getOSMEntitySnapshotGroupingCellStreamer(
SerializableFunction<List<OSMEntitySnapshot>, Iterable<S>> mapper,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
List<OSMEntitySnapshot> snapshots = new ArrayList<>();
List<S> result = new LinkedList<>();
cellIterator.iterateByTimestamps(oshEntityCell)
cellIterator.iterateByTimestamps(source)
.takeWhile(process::isActive)
.map(OSMEntitySnapshotImpl::new)
.forEach(contribution -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.heigit.ohsome.oshdb.api.mapreducer.backend.Kernels.CellProcessor;
import org.heigit.ohsome.oshdb.index.XYGridTree.CellIdRange;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.function.SerializableBiFunction;
import org.heigit.ohsome.oshdb.util.function.SerializableBinaryOperator;
import org.heigit.ohsome.oshdb.util.function.SerializableFunction;
Expand Down Expand Up @@ -69,7 +70,7 @@ private <S> S reduce(
.filter(ignored -> this.isActive())
.flatMap(this::getOshCellsStream)
.filter(ignored -> this.isActive())
.map(oshCell -> processor.apply(oshCell, cellIterator))
.map(cell -> processor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator))
.reduce(identitySupplier.get(), combiner);
}

Expand All @@ -91,7 +92,7 @@ private Stream<X> stream(
.filter(ignored -> this.isActive())
.flatMap(this::getOshCellsStream)
.filter(ignored -> this.isActive())
.flatMap(oshCell -> processor.apply(oshCell, cellIterator));
.flatMap(cell -> processor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator));
}

// === map-reduce operations ===
Expand Down
Loading