Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.geospatial.transform.function;

import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.BytesUtils;
import org.locationtech.jts.geom.Geometry;


/**
* Base Binary geo transform functions that can take either one of the arguments as literal.
*/
public abstract class BaseBinaryGeoTransformFunction extends BaseTransformFunction {
private TransformFunction _firstArgument;
private TransformFunction _secondArgument;
private Geometry _firstLiteral;
private Geometry _secondLiteral;
private int[] _intResults;
private double[] _doubleResults;

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
Preconditions
.checkArgument(arguments.size() == 2, "2 arguments are required for transform function: %s", getName());
TransformFunction transformFunction = arguments.get(0);
Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(),
"First argument must be single-valued for transform function: %s", getName());
Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES
|| transformFunction instanceof LiteralTransformFunction,
"The first argument must be of type BYTES , but was %s",
transformFunction.getResultMetadata().getDataType()
);
if (transformFunction instanceof LiteralTransformFunction) {
_firstLiteral = GeometrySerializer.deserialize(
BytesUtils.toBytes(((LiteralTransformFunction) transformFunction).getLiteral()));
} else {
_firstArgument = transformFunction;
}
transformFunction = arguments.get(1);
Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(),
"Second argument must be single-valued for transform function: %s", getName());
Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES
|| transformFunction instanceof LiteralTransformFunction,
"The second argument must be of type BYTES , but was %s",
transformFunction.getResultMetadata().getDataType()
);
if (transformFunction instanceof LiteralTransformFunction) {
_secondLiteral = GeometrySerializer.deserialize(
BytesUtils.toBytes(((LiteralTransformFunction) transformFunction).getLiteral()));
} else {
_secondArgument = transformFunction;
}
}

protected int[] transformGeometryToIntValuesSV(ProjectionBlock projectionBlock) {
if (_intResults == null) {
_intResults = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
}
byte[][] firstValues;
byte[][] secondValues;
if (_firstArgument == null && _secondArgument == null) {
_intResults = new int[Math.min(projectionBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)];
Arrays.fill(_intResults, transformGeometryToInt(_firstLiteral, _secondLiteral));
} else if (_firstArgument == null) {
secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
_intResults[i] = transformGeometryToInt(_firstLiteral, GeometrySerializer.deserialize(secondValues[i]));
}
} else if (_secondArgument == null) {
firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
_intResults[i] = transformGeometryToInt(GeometrySerializer.deserialize(firstValues[i]), _secondLiteral);
}
} else {
firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
_intResults[i] = transformGeometryToInt(GeometrySerializer.deserialize(firstValues[i]),
GeometrySerializer.deserialize(secondValues[i]));
}
}
return _intResults;
}

protected double[] transformGeometryToDoubleValuesSV(ProjectionBlock projectionBlock) {
if (_doubleResults == null) {
_doubleResults = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
}
byte[][] firstValues;
byte[][] secondValues;
if (_firstArgument == null && _secondArgument == null) {
_doubleResults = new double[Math.min(projectionBlock.getNumDocs(), DocIdSetPlanNode.MAX_DOC_PER_CALL)];
Arrays.fill(_doubleResults, transformGeometryToDouble(_firstLiteral, _secondLiteral));
} else if (_firstArgument == null) {
secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
_doubleResults[i] = transformGeometryToDouble(_firstLiteral, GeometrySerializer.deserialize(secondValues[i]));
}
} else if (_secondArgument == null) {
firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
_doubleResults[i] = transformGeometryToDouble(GeometrySerializer.deserialize(firstValues[i]), _secondLiteral);
}
} else {
firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
_doubleResults[i] = transformGeometryToDouble(GeometrySerializer.deserialize(firstValues[i]),
GeometrySerializer.deserialize(secondValues[i]));
}
}
return _doubleResults;
}

public int transformGeometryToInt(Geometry firstGeometry, Geometry secondGeometry) {
throw new UnsupportedOperationException();
}

public double transformGeometryToDouble(Geometry firstGeometry, Geometry secondGeometry) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,9 @@
*/
package org.apache.pinot.core.geospatial.transform.function;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.GeometryUtils;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.locationtech.jts.geom.Geometry;


Expand All @@ -39,61 +29,29 @@
* second geometry lie in the exterior of the first geometry, and at least one point of the interior of the first
* geometry lies in the interior of the second geometry.
*/
public class StContainsFunction extends BaseTransformFunction {
public class StContainsFunction extends BaseBinaryGeoTransformFunction {
public static final String FUNCTION_NAME = "ST_Contains";
private TransformFunction _firstArgument;
private TransformFunction _secondArgument;
private int[] _results;

@Override
public String getName() {
return FUNCTION_NAME;
}

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
Preconditions
.checkArgument(arguments.size() == 2, "2 arguments are required for transform function: %s", getName());
TransformFunction transformFunction = arguments.get(0);
Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(),
"First argument must be single-valued for transform function: %s", getName());
Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES
|| transformFunction instanceof LiteralTransformFunction,
"The first argument must be of type BYTES , but was %s",
transformFunction.getResultMetadata().getDataType()
);
_firstArgument = transformFunction;
transformFunction = arguments.get(1);
Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(),
"Second argument must be single-valued for transform function: %s", getName());
Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES
|| transformFunction instanceof LiteralTransformFunction,
"The second argument must be of type BYTES , but was %s",
transformFunction.getResultMetadata().getDataType()
);
_secondArgument = transformFunction;
}

@Override
public TransformResultMetadata getResultMetadata() {
return INT_SV_NO_DICTIONARY_METADATA;
}

@Override
public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
if (_results == null) {
_results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
}
byte[][] firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
byte[][] secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
Geometry firstGeometry = GeometrySerializer.deserialize(firstValues[i]);
Geometry secondGeometry = GeometrySerializer.deserialize(secondValues[i]);
if (GeometryUtils.isGeography(firstGeometry) || GeometryUtils.isGeography(secondGeometry)) {
throw new RuntimeException(String.format("%s is available for Geometry objects only", FUNCTION_NAME));
}
_results[i] = firstGeometry.contains(secondGeometry) ? 1 : 0;
return transformGeometryToIntValuesSV(projectionBlock);
}

@Override
public int transformGeometryToInt(Geometry firstGeometry, Geometry secondGeometry) {
if (GeometryUtils.isGeography(firstGeometry) || GeometryUtils.isGeography(secondGeometry)) {
throw new RuntimeException(String.format("%s is available for Geometry objects only", FUNCTION_NAME));
}
return _results;
return firstGeometry.contains(secondGeometry) ? 1 : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,9 @@
package org.apache.pinot.core.geospatial.transform.function;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.GeometryUtils;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.Point;

Expand All @@ -40,71 +31,38 @@
* cartesian minimum distance (based on spatial ref) between two geometries in projected units. For geography, returns
* the great-circle distance in meters between two SphericalGeography points. Note that g1, g2 shall have the same type.
*/
public class StDistanceFunction extends BaseTransformFunction {
public class StDistanceFunction extends BaseBinaryGeoTransformFunction {
private static final float MIN_LATITUDE = -90;
private static final float MAX_LATITUDE = 90;
private static final float MIN_LONGITUDE = -180;
private static final float MAX_LONGITUDE = 180;
public static final String FUNCTION_NAME = "ST_Distance";
private TransformFunction _firstArgument;
private TransformFunction _secondArgument;
private double[] _results;

@Override
public String getName() {
return FUNCTION_NAME;
}

@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
Preconditions
.checkArgument(arguments.size() == 2, "2 arguments are required for transform function: %s", getName());
TransformFunction transformFunction = arguments.get(0);
Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(),
"First argument must be single-valued for transform function: %s", getName());
Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES
|| transformFunction instanceof LiteralTransformFunction,
"The first argument must be of type BYTES , but was %s",
transformFunction.getResultMetadata().getDataType()
);
_firstArgument = transformFunction;
transformFunction = arguments.get(1);
Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(),
"Second argument must be single-valued for transform function: %s", getName());
Preconditions.checkArgument(transformFunction.getResultMetadata().getDataType() == FieldSpec.DataType.BYTES
|| transformFunction instanceof LiteralTransformFunction,
"The second argument must be of type BYTES , but was %s",
transformFunction.getResultMetadata().getDataType()
);
_secondArgument = transformFunction;
}

@Override
public TransformResultMetadata getResultMetadata() {
return DOUBLE_SV_NO_DICTIONARY_METADATA;
}

@Override
public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
if (_results == null) {
_results = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
return transformGeometryToDoubleValuesSV(projectionBlock);
}

@Override
public double transformGeometryToDouble(Geometry firstGeometry, Geometry secondGeometry) {
if (GeometryUtils.isGeography(firstGeometry) != GeometryUtils.isGeography(secondGeometry)) {
throw new RuntimeException("The first and second arguments shall either all be geometry or all geography");
}
byte[][] firstValues = _firstArgument.transformToBytesValuesSV(projectionBlock);
byte[][] secondValues = _secondArgument.transformToBytesValuesSV(projectionBlock);
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
Geometry firstGeometry = GeometrySerializer.deserialize(firstValues[i]);
Geometry secondGeometry = GeometrySerializer.deserialize(secondValues[i]);
if (GeometryUtils.isGeography(firstGeometry) != GeometryUtils.isGeography(secondGeometry)) {
throw new RuntimeException("The first and second arguments shall either all be geometry or all geography");
}
if (GeometryUtils.isGeography(firstGeometry)) {
_results[i] = sphericalDistance(firstGeometry, secondGeometry);
} else {
_results[i] =
firstGeometry.isEmpty() || secondGeometry.isEmpty() ? Double.NaN : firstGeometry.distance(secondGeometry);
}
if (GeometryUtils.isGeography(firstGeometry)) {
return sphericalDistance(firstGeometry, secondGeometry);
} else {
return firstGeometry.isEmpty() || secondGeometry.isEmpty() ? Double.NaN : firstGeometry.distance(secondGeometry);
}
return _results;
}

private static void checkLatitude(double latitude) {
Expand Down
Loading