Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ jobs:
- spark: 4.0.0
scala: 2.13.8
jdk: '17'
- spark: 3.5.4
scala: 2.12.18
jdk: '17'
- spark: 3.5.0
scala: 2.13.8
jdk: '11'
Expand Down Expand Up @@ -103,13 +106,8 @@ jobs:
run: |
SPARK_COMPAT_VERSION=${SPARK_VERSION:0:3}

if [ "${SPARK_VERSION}" == "3.5.0" ]; then
pip install pyspark==3.5.0 pandas shapely apache-sedona pyarrow
export SPARK_HOME=$(python -c "import pyspark; print(pyspark.__path__[0])")
fi

if [ "${SPARK_VERSION}" == "4.0.0" ]; then
pip install pyspark==4.0.0 pandas shapely apache-sedona pyarrow
if [[ "${SPARK_VERSION}" == "3.5"* ]] || [[ "${SPARK_VERSION}" == "4."* ]]; then
pip install pyspark==$SPARK_VERSION pandas shapely apache-sedona pyarrow
export SPARK_HOME=$(python -c "import pyspark; print(pyspark.__path__[0])")
fi

Expand Down
52 changes: 52 additions & 0 deletions docs/api/sql/Function.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,58 @@
under the License.
-->

## ExpandAddress

Introduction: Returns an array of expanded forms of the input address string. This is backed by the [libpostal](https://github.com/openvenues/libpostal) library's address expanding functionality.

!!!Note
Jpostal requires at least 2 GB of free disk space to store the data files used for address parsing and expanding. The data files are downloaded automatically when the function is called for the first time.

!!!Note
The version of jpostal installed with this package only supports Linux and MacOS. If you are using Windows, you will need to install libjpostal and libpostal manually and ensure that they are available in your `java.library.path`.

Format: `ExpandAddress (address: String)`

Since: `v1.8.0`

SQL Example

```sql
SELECT ExpandAddress("100 W 1st St, Los Angeles, CA 90012");
```

Output:

```
[100 w 1st saint, 100 w 1st street, 100 west 1st saint, 100 west 1st street, 100 w 1 saint, 100 w 1 street, 100 west 1 saint, 100 west 1 street]
```

## ParseAddress

Introduction: Returns an array of the components (e.g. street, postal code) of the input address string. This is backed by the [libpostal](https://github.com/openvenues/libpostal) library's address parsing functionality.

!!!Note
Jpostal requires at least 2 GB of free disk space to store the data files used for address parsing and expanding. The data files are downloaded automatically when the library is initialized.

!!!Note
The version of jpostal installed with this package only supports Linux and MacOS. If you are using Windows, you will need to install libjpostal and libpostal manually and ensure that they are available in your `java.library.path`.

Format: `ParseAddress (address: String)`

Since: `v1.8.0`

SQL Example

```sql
SELECT ParseAddress("100 W 1st St, Los Angeles, CA 90012");
```

Output:

```
[{house_number, 100}, {road, w 1st st}, {city, los angeles}, {state, ca}, {postcode, 90012}]
```

## GeometryType

Introduction: Returns the type of the geometry as a string. Eg: 'LINESTRING', 'POLYGON', 'MULTIPOINT', etc. This function also indicates if the geometry is measured, by returning a string of the form 'POINTM'.
Expand Down
20 changes: 20 additions & 0 deletions python/sedona/spark/sql/st_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,26 @@
_call_st_function = partial(call_sedona_function, "st_functions")


@validate_argument_types
def ExpandAddress(address: ColumnOrName): # noqa: N802
"""Normalize an address string into its canonical forms.

:param address: The address string or column to normalize.
:return: An array of normalized address strings.
"""
return _call_st_function("ExpandAddress", address)


@validate_argument_types
def ParseAddress(address: ColumnOrName): # noqa: N802
"""Parse an address string into its components (label/value pairs).

:param address: The address string or column to parse.
:return: An array of maps with label/value pairs for address components.
"""
return _call_st_function("ParseAddress", address)


@validate_argument_types
def GeometryType(geometry: ColumnOrName):
"""Return the type of the geometry as a string.
Expand Down
37 changes: 37 additions & 0 deletions python/tests/sql/test_dataframe_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1783,3 +1783,40 @@ def test_lof(self):
df.withColumn(
"localOutlierFactor", ST_LocalOutlierFactor("geometry", 2, False)
).collect()

def test_expand_address_df_api(self):
input_df = (
self.spark.range(1)
.selectExpr(
"'781 Franklin Ave Crown Heights Brooklyn NY 11216 USA' as address"
)
.cache()
) # cache to avoid Constant Folding Optimization

# Actually running downloads the model and is very expensive, so we just check the plan
# Checking the plan should allow us to verify that the function is correctly registered
assert input_df.select(
ExpandAddress("address").alias("normalized")
).sameSemantics(
input_df.select(f.expr("ExpandAddress(address)").alias("normalized"))
)

input_df.unpersist()

def test_parse_address_df_api(self):
input_df = (
self.spark.range(1)
.selectExpr(
"'781 Franklin Ave Crown Heights Brooklyn NY 11216 USA' as address"
)
.cache()
) # cache to avoid Constant Folding Optimization

# Actually running downloads the model and is very expensive, so we just check the plan
# Checking the plan should allow us to verify that the function is correctly registered
assert input_df.select(
ParseAddress(f.col("address")).alias("parsed")
).sameSemantics(
input_df.select(f.expr("ParseAddress(address)").alias("parsed"))
)
input_df.unpersist()
11 changes: 11 additions & 0 deletions spark/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.wherobots</groupId>
<artifactId>jpostal</artifactId>
<version>1.2.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import java.io.Serializable;
import java.lang.reflect.Field;
import java.nio.file.Paths;
import org.apache.sedona.core.enums.GridType;
import org.apache.sedona.core.enums.IndexType;
import org.apache.sedona.core.enums.JoinBuildSide;
import org.apache.sedona.core.enums.JoinSpartitionDominantSide;
import org.apache.sedona.core.enums.SpatialJoinOptimizationMode;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.Utils;
Expand Down Expand Up @@ -62,61 +65,129 @@ public class SedonaConf implements Serializable {
// Parameters for geostats
private Boolean DBSCANIncludeOutliers = true;

// Parameters for libpostal integration
private String libPostalDataDir;
private Boolean libPostalUseSenzing = false;

public static SedonaConf fromActiveSession() {
return new SedonaConf(SparkSession.active().conf());
}

public static SedonaConf fromSparkEnv() {
return new SedonaConf(SparkEnv.get().conf());
}

public SedonaConf(SparkConf sparkConf) {
this(
new ConfGetter() {
@Override
public String get(String key, String defaultValue) {
return sparkConf.get(key, defaultValue);
}

@Override
public String get(String key) {
return sparkConf.get(key, null);
}

public boolean contains(String key) {
return sparkConf.contains(key);
}
});
}

public SedonaConf(RuntimeConfig runtimeConfig) {
this.useIndex = Boolean.parseBoolean(getConfigValue(runtimeConfig, "global.index", "true"));
this(
new ConfGetter() {
@Override
public String get(String key, String defaultValue) {
return runtimeConfig.get(key, defaultValue);
}

@Override
public String get(String key) {
return runtimeConfig.get(key);
}

public boolean contains(String key) {
return runtimeConfig.contains(key);
}
});
}

private interface ConfGetter {
String get(String key, String defaultValue);

String get(String key);

boolean contains(String key);
}

private SedonaConf(ConfGetter confGetter) {
this.useIndex = Boolean.parseBoolean(getConfigValue(confGetter, "global.index", "true"));
this.indexType =
IndexType.getIndexType(getConfigValue(runtimeConfig, "global.indextype", "rtree"));
IndexType.getIndexType(getConfigValue(confGetter, "global.indextype", "rtree"));
this.joinApproximateTotalCount =
Long.parseLong(getConfigValue(runtimeConfig, "join.approxcount", "-1"));
String[] boundaryString = getConfigValue(runtimeConfig, "join.boundary", "0,0,0,0").split(",");
Long.parseLong(getConfigValue(confGetter, "join.approxcount", "-1"));
String[] boundaryString = getConfigValue(confGetter, "join.boundary", "0,0,0,0").split(",");
this.datasetBoundary =
new Envelope(
Double.parseDouble(boundaryString[0]),
Double.parseDouble(boundaryString[1]),
Double.parseDouble(boundaryString[2]),
Double.parseDouble(boundaryString[3]));
this.joinGridType =
GridType.getGridType(getConfigValue(runtimeConfig, "join.gridtype", "kdbtree"));
GridType.getGridType(getConfigValue(confGetter, "join.gridtype", "kdbtree"));
this.joinBuildSide =
JoinBuildSide.getBuildSide(getConfigValue(runtimeConfig, "join.indexbuildside", "left"));
JoinBuildSide.getBuildSide(getConfigValue(confGetter, "join.indexbuildside", "left"));
this.joinSpartitionDominantSide =
JoinSpartitionDominantSide.getJoinSpartitionDominantSide(
getConfigValue(runtimeConfig, "join.spatitionside", "left"));
getConfigValue(confGetter, "join.spatitionside", "left"));
this.fallbackPartitionNum =
Integer.parseInt(getConfigValue(runtimeConfig, "join.numpartition", "-1"));
Integer.parseInt(getConfigValue(confGetter, "join.numpartition", "-1"));
this.autoBroadcastJoinThreshold =
bytesFromString(
getConfigValue(
runtimeConfig,
confGetter,
"join.autoBroadcastJoinThreshold",
runtimeConfig.get("spark.sql.autoBroadcastJoinThreshold")));
confGetter.get("spark.sql.autoBroadcastJoinThreshold")));
this.spatialJoinOptimizationMode =
SpatialJoinOptimizationMode.getSpatialJoinOptimizationMode(
getConfigValue(runtimeConfig, "join.optimizationmode", "nonequi"));
getConfigValue(confGetter, "join.optimizationmode", "nonequi"));

// Parameters for knn joins
this.includeTieBreakersInKNNJoins =
Boolean.parseBoolean(getConfigValue(runtimeConfig, "join.knn.includeTieBreakers", "false"));
Boolean.parseBoolean(getConfigValue(confGetter, "join.knn.includeTieBreakers", "false"));

// Parameters for geostats
this.DBSCANIncludeOutliers =
Boolean.parseBoolean(runtimeConfig.get("spark.sedona.dbscan.includeOutliers", "true"));
Boolean.parseBoolean(confGetter.get("spark.sedona.dbscan.includeOutliers", "true"));

// Parameters for libpostal integration
String libPostalDataDir =
confGetter.get(
"spark.sedona.libpostal.dataDir",
Paths.get(System.getProperty("java.io.tmpdir"))
.resolve(Paths.get("libpostal"))
.toString());
if (!libPostalDataDir.isEmpty() && !libPostalDataDir.endsWith("/")) {
libPostalDataDir = libPostalDataDir + "/";
}
this.libPostalDataDir = libPostalDataDir;

this.libPostalUseSenzing =
Boolean.parseBoolean(confGetter.get("spark.sedona.libpostal.useSenzing", "true"));
}

// Helper method to prioritize `sedona.*` over `spark.sedona.*`
private String getConfigValue(
RuntimeConfig runtimeConfig, String keySuffix, String defaultValue) {
private String getConfigValue(ConfGetter confGetter, String keySuffix, String defaultValue) {
String sedonaKey = "sedona." + keySuffix;
String sparkSedonaKey = "spark.sedona." + keySuffix;

if (runtimeConfig.contains(sedonaKey)) {
return runtimeConfig.get(sedonaKey, defaultValue);
if (confGetter.contains(sedonaKey)) {
return confGetter.get(sedonaKey, defaultValue);
} else {
return runtimeConfig.get(sparkSedonaKey, defaultValue);
return confGetter.get(sparkSedonaKey, defaultValue);
}
}

Expand Down Expand Up @@ -179,6 +250,9 @@ public String toString() {
}

static long bytesFromString(String str) {
if (str == null || str.isEmpty()) {
return 0;
}
if (str.startsWith("-")) {
return -1 * Utils.byteStringAsBytes(str.substring(1));
} else {
Expand All @@ -193,4 +267,12 @@ public SpatialJoinOptimizationMode getSpatialJoinOptimizationMode() {
public Boolean getDBSCANIncludeOutliers() {
return DBSCANIncludeOutliers;
}

public String getLibPostalDataDir() {
return libPostalDataDir;
}

public Boolean getLibPostalUseSenzing() {
return libPostalUseSenzing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ object Catalog extends AbstractCatalog {

override val expressions: Seq[FunctionDescription] = Seq(
// Expression for vectors
function[ExpandAddress](),
function[ParseAddress](),
function[GeometryType](),
function[ST_LabelPoint](),
function[ST_PointFromText](),
Expand Down
Loading
Loading