-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broadcast spatial join #9474
Broadcast spatial join #9474
Conversation
16812f3
to
f0bbfb8
Compare
f0bbfb8
to
65006c9
Compare
@mbasmanova Can you please give a short explanation what spatial join is? Just a link coud good enough? Is it SQL standard compliant or is it enhancment to SQL? |
65006c9
to
a37a3af
Compare
@kokosing Grzegorz, spatial join combines relations with geometries using spatial relationships contains, intersects or distance. Point-in-polygon join is an example of a spatial join. To express spatial relationship, one can use ST_Contains, ST_Intersects or ST_Distance functions defined in ISO/IEC 13249 SQL/MM Part 3 Spatial: The Standard to Manage Spatial Data in Relational Database Systems - https://www.iso.org/standard/60343.html
Presto supports a subset of spatial functions from the standard mentioned above: https://prestodb.io/docs/current/functions/geospatial.html |
Thank you. This was a functional description. From the point of view of technical implementation. It seems that the simplest would be to run cross join and evaluate the join condition. Do you follow this or do you have a smarter idea? Do you somehow partition the geospatial space, so then you could run hash join? |
2a1833b
to
8c794e5
Compare
@kokosing Grzegorz, you are correct. The simplest way is to run cross join and evaluate the join condition. Presto already supports this and no changes are necessary to use this for spatial joins. However, for large datasets this is quite inefficient and takes a very long time. http://www.cs.umd.edu/~hjs/pubs/jacoxtrjoin07.pdf provides a nice overview of the spatial partitioning techniques which make spatial joins a lot more efficient. In the long term, I'd like to implement a spatial join using KDB-tree partitioning of the sources and R-Tree index on individual join nodes similar to how this is done in https://github.com/DataSystemsLab/GeoSpark . In the short term, however, I'm looking to implement (1) broadcast spatial join using R-Tree and (2) tile-based generic spatial join. (1) is useful for when one side of the join is quite small and can be replicated to all the join nodes; in this case, creating R-Tree from the smaller side and streaming the larger side through it is fast and efficient. This is what this PR is about. (2) one can convert a spatial join into an equi-join using Bing Tiles (see Bing Tiles section in https://prestodb.io/docs/current/functions/geospatial.html). For example, point-in-polygon join can be converted like so: spatial join
an equivalent equi-join:
|
do you plan to have a |
d910121
to
7837f24
Compare
@findepi Piotr, I'm still thinking about it, but tentatively I'm planning to write a rule to covert spatial join to tile-based equi-join. |
fb5140c
to
9f0a0d5
Compare
@mbasmanova Maria, thank you for explanation. It is really interesting topic. |
a891114
to
aa40350
Compare
@mbasmanova I am sorry if I ask dumb questions. I have not read papers spatial related papers, so I might not see some basic issues. Anyway, I wanted to ask have you considered any alternative approaches to support spatial queries efficiently other than modifying join execution code? Have you had a chance to read: #9585 (comment)? To me more natural approach with introducing an Here, for reference, I copy-paste the relevant part of the mentioned above comment: The biggest concern I have is that join execution already is very complicated adding there more logic do not help. Probably you already researched alternative solutions and what you propose is the best one, but let me try with an idea which came to my mind. Auxiliary questions:
Do you think that using R-Tree for queries like
A bit closer, but still no. What if number of distinct values of What if we would introduce Index node. This node would mean that we create an index that would be used later by some expression in order to make them more performant. In this case it will be used for spatial use case, but in general it could be used for other things too like text relatated expression or bioinformatic or ML. The function it is using a kind of aggregation function which takes all the values of given columns and produces single value, but without doing any grouping. Then plan for
Having this:
would produce
|
return NOT_BLOCKED; | ||
} | ||
|
||
List<SettableFuture<PagesSpatialIndex>> settableFutures; | ||
synchronized (this) { | ||
this.pagesSpatialIndex = pagesSpatialIndex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: also verify( this.pagesSpatialIndex == null)
private final List<Type> probeTypes; | ||
private final List<Type> outputTypes; | ||
private final List<Integer> probeOutputChannels; | ||
private final int probeGeometryChannel; | ||
private final PagesSpatialIndexFactory pagesSpatialIndexFactory; | ||
|
||
private ListenableFuture<Supplier<PagesSpatialIndex>> pagesSpatialIndexFuture; | ||
private PagesSpatialIndex pagesSpatialIndex; | ||
private ListenableFuture<PagesSpatialIndex> pagesSpatialIndexFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know you're nulling this out when finishing. Also null this out in close()
, in case this operator is closed without finishing.
DriverYieldSignal yieldSignal = operatorContext.getDriverContext().getYieldSignal(); | ||
while (probePosition < probe.getPositionCount()) { | ||
if (joinAddresses == null) { | ||
joinAddresses = pagesSpatialIndex.findJoinAddresses(probePosition, probe, probeGeometryChannel); | ||
localUserMemoryContext.setBytes(SizeOf.sizeOfLongArray(joinAddresses.length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just SizeOf.sizeOf(joinAddresses)
would do. + static import
b4150b9
to
b727cb0
Compare
@findepi @nezihyigitbasi @dain @highker Nezih, thanks for helping me to figure out how to properly account for memory used by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments for size calculations.
// <header-size-of-ImmutableList> + | ||
// <instance-size-of-ObjectArrayList> * <length-of-outer-list> + | ||
// <sum-of-all-Blocks> | ||
return INSTANCE_SIZE + IMMUTABLE_LIST_HEADER_SIZE + OBJECT_ARRAY_LIST_INSTANCE_SIZE * channels.size() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can format this as below to categorize different types of overhead.
return INSTANCE_SIZE +
IMMUTABLE_LIST_HEADER_SIZE +
OBJECT_ARRAY_LIST_INSTANCE_SIZE * channels.size() +
channels.stream()
.flatMap(List::stream)
.mapToLong(Block::getRetainedSizeInBytes)
.sum();
@@ -77,6 +81,13 @@ public PagesSpatialIndexSupplier( | |||
this.memorySizeInBytes = INSTANCE_SIZE + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can extract a method for this complex instance size calculation, which will simplify the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor comments.
// The line below is the same as newCachedThreadPool(daemonThreadsNamed(...)) except RejectionExecutionHandler. | ||
// RejectionExecutionHandler is set to DiscardPolicy (instead of the default AbortPolicy) here. | ||
// Otherwise, a large number of RejectedExecutionException will flood logging, resulting in Travis failure. | ||
executor = new ThreadPoolExecutor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executor
and scheduledExecutor
are shared across the test methods, so I think we should mark this test as single threaded (@Test(singleThreaded = true)
). That will also help capping the memory usage and number of active threads, since we create a thread pool with potentially unlimited size in setup.
.row(POINT_W, "w"); | ||
OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), probePages.getTypes(), Ints.asList(1), 0, pagesSpatialIndexFactory); | ||
|
||
// expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary comment
.row(POINT_W, "w"); | ||
OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), probePages.getTypes(), Ints.asList(1), 0, pagesSpatialIndexFactory); | ||
|
||
// expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary comment
RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR)); | ||
OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), probePages.getTypes(), Ints.asList(1), 0, pagesSpatialIndexFactory); | ||
|
||
// expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary comment
|
||
public long getAddress() | ||
{ | ||
return address; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be unused. Below you directly access the address
field.
if (rtree.isEmpty()) { | ||
return EMPTY_INDEX; | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else
not needed.
if (root.getLevel() == 0) { | ||
return ABSTRACT_NODE_INSTANCE_SIZE + ENVELOPE_INSTANCE_SIZE + root.getChildBoundables().stream().mapToLong(child -> computeMemorySizeInBytes((ItemBoundable) child)).sum(); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else
not needed.
private boolean finishing; | ||
private boolean finished; | ||
|
||
public SpatialIndexBuilderOperator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class can be private
|
||
/** | ||
* Called by {@link SpatialIndexBuilderOperator} to provide a | ||
* {@link Supplier} of spatial indexes for {@link SpatialJoinOperator}S to use. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{@link SpatialJoinOperator}S
-> {@link SpatialJoinOperator}s
int expectedPositions, | ||
PagesIndex.Factory pagesIndexFactory) | ||
{ | ||
this.operatorContext = operatorContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can add requireNonNull checks here.
b727cb0
to
3854280
Compare
@nezihyigitbasi Nezih, thank you for review. I made the changes you suggested and updated the PR. |
@@ -33,6 +34,9 @@ | |||
implements PagesHashStrategy | |||
{ | |||
private static final int INSTANCE_SIZE = ClassLayout.parseClass(SimplePagesHashStrategy.class).instanceSize(); | |||
private static final int OBJECT_ARRAY_LIST_INSTANCE_SIZE = ClassLayout.parseClass(ObjectArrayList.class).instanceSize(); | |||
private static final int IMMUTABLE_LIST_HEADER_SIZE = ClassLayout.parseClass(ImmutableList.class).headerSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ImmutableList
is abstract, so is this better than parseClass(Object.class).headerSize()
? (except for documentation purposes)
(no change requested)
// Each element of the List is an ObjectArrayList<Block>. | ||
// Hence, the memory used by channels is estimated as | ||
// <header-size-of-ImmutableList> + | ||
// <instance-size-of-ObjectArrayList> * <length-of-outer-list> + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still has some % error. Do we need to include ObjectArrayList#a
size too?
(imagine: channels is single element, channels[0] is very long list of empty blocks. we would calculate O(1) size, ignoring channels[0].a size)
} | ||
|
||
if (finishing && indexNotNeeded.isDone()) { | ||
index.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add close()
method and have index.clear()
there too.
Like in #10039
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can for example move
index.clear();
localUserMemoryContext.setBytes(index.getEstimatedSize().toBytes());
to close()
and just call it here.
b97ab2b
to
045fb77
Compare
return new PagesSpatialIndexSupplier(session, valueAddresses, types, outputChannels, channels, sizeOfChannels(), geometryChannel, spatialRelationshipTest, filterFunctionFactory); | ||
} | ||
|
||
private long sizeOfChannels() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we put this logic inside PagesSpatialIndexSupplier
given it will take channels
anyway.
private long sizeOfChannels() | ||
{ | ||
return IMMUTABLE_LIST_HEADER_SIZE + | ||
stream(channels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, stream
is much slower than for
. @nezihyigitbasi can provide more context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@highker this is different than @nezihyigitbasi 's bd06b0b, because AFAIU this is called once in operator's lifetime
47be531
to
9d5a038
Compare
@highker @nezihyigitbasi @findepi
To fix that I made the following changes:
Hope this works. |
But then this is pretty different than in hash join, isn't it?
Why not simplify improve |
Furthermore, Hence, changing
I decided that fixing memory accounting for the hash join is out of scope for the PR. The straightforward way I tried earlier suffers from the issue of having to assume the implementations of outer and inner lists. |
Benchmark (pointCount) Mode Cnt Score Error Units BenchmarkSpatialJoin.benchmarkJoin 10 avgt 30 38.842 ± 2.867 ms/op BenchmarkSpatialJoin.benchmarkJoin 100 avgt 30 212.509 ± 9.965 ms/op BenchmarkSpatialJoin.benchmarkJoin 1000 avgt 30 1937.329 ± 79.988 ms/op BenchmarkSpatialJoin.benchmarkJoin 10000 avgt 30 18822.191 ± 460.088 ms/op BenchmarkSpatialJoin.benchmarkUserOptimizedJoin 10 avgt 30 15.621 ± 1.221 ms/op BenchmarkSpatialJoin.benchmarkUserOptimizedJoin 100 avgt 30 16.939 ± 1.209 ms/op BenchmarkSpatialJoin.benchmarkUserOptimizedJoin 1000 avgt 30 29.448 ± 1.990 ms/op BenchmarkSpatialJoin.benchmarkUserOptimizedJoin 10000 avgt 30 102.185 ± 4.111 ms/op
9d5a038
to
d98a45f
Compare
import static com.facebook.presto.testing.TestingSession.testSessionBuilder; | ||
import static java.util.Collections.emptyList; | ||
|
||
public class TestSpatialJoinPlanning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
|
||
public BaseRuleTest(Plugin... plugins) | ||
{ | ||
this.plugins = plugins; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ImmutableList.copy()
An optimizer rule to rewrite a cross join with a spatial filter on top into a spatial join and custom operators to execute spatial joins efficiently (broadcast joins only). For example, the plan for the following query SELECT ... FROM points, polygons WHERE ST_Contains(ST_GeometryFromText(wkt), ST_Point(longitude, latitude)) is rewritten from - FilterProject[filterPredicate = "st_contains"("st_geometryfromtext"("wkt"), "st_point"("longitude", "latitude"))] => [] - CrossJoin => [latitude:double, longitude:double, wkt:varchar] into - SpatialJoin["st_contains"("st_geometryfromtext", "st_point")] => [] - ScanProject[table = ... st_point := "st_point"("longitude", "latitude") - LocalExchange[SINGLE] () => st_geometryfromtext:Geometry - RemoteExchange[REPLICATE] => st_geometryfromtext:Geometry - Project[] => [st_geometryfromtext:Geometry] st_geometryfromtext := "st_geometryfromtext"("wkt") - ScanFilterProject[table = ... Benchmark results: Benchmark (pointCount) Mode Cnt Score Error Units BenchmarkSpatialJoin.benchmarkJoin 10 avgt 30 15.163 ± 1.610 ms/op BenchmarkSpatialJoin.benchmarkJoin 100 avgt 30 13.837 ± 0.919 ms/op BenchmarkSpatialJoin.benchmarkJoin 1000 avgt 30 16.205 ± 1.360 ms/op BenchmarkSpatialJoin.benchmarkJoin 10000 avgt 30 22.915 ± 1.731 ms/op BenchmarkSpatialJoin.benchmarkUserOptimizedJoin 10 avgt 30 14.426 ± 1.048 ms/op BenchmarkSpatialJoin.benchmarkUserOptimizedJoin 100 avgt 30 14.507 ± 0.518 ms/op BenchmarkSpatialJoin.benchmarkUserOptimizedJoin 1000 avgt 30 16.265 ± 1.447 ms/op BenchmarkSpatialJoin.benchmarkUserOptimizedJoin 10000 avgt 30 22.076 ± 1.547 ms/op
d98a45f
to
9f2ac1d
Compare
Congrats! |
Implements broadcast spatial join using an R-Tree as described in #9890.
Supports spatial joins defined using ST_Contains and ST_Intersects functions.
The implementation consists of an optimizer rule and two custom operators. The new rule changes a cross join node with a spatial filter on top into a spatial join node which is executed using new operators. SpatialIndexBuilderOperator builds an R-Tree from build side geometries (relation on the right side of the join). SpatialLookupJoinOperator processes probe geometries one record at a time looking up matching geometries in the R-Tree.
For example, the plan for the following query
changes from
to
Tests for the new functionality include:
Known limitations and future work:
Only INNER JOIN is supported.
The new optimizer rule runs after AddExchanges. Moving the rule to some place before AddExchanges requires changing PredicatePushDown rule to become aware of spatial joins and not unroll spatial join into cross join and a filter on top.
The new optimizer rule and the new operators are defined in presto-main, although, logically, they belong to the presto-geospatial plugin. In the future, when plugin interface evolves to support custom rules and operators, the new rule and operators can be moved.
The logic of computing memory size of a geometry object requires access to package private classes from the ESRI library. To gain access, presto-geospatial-toolkit includes a class in com.esri.core.geometry package. This split-package situation will block migration to Java 9 modules and needs to be addressed at some point. Add Geometry::estimateMemorySize() API Esri/geometry-api-java#156
The R-Tree implementation in the JTS library is not GC-friendly as it creates a tree of Java objects. This implementation needs to be replaced.
TODO: Use dictionary style blocks for probe columns (see LookupJoinPageBuilder)
Addressing items 4 and 5 will require changes to external libraries (ESRI and JTS).