Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions .github/workflows/example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ concurrency:

jobs:
build:
name: 'Spark ${{ matrix.spark }}, Hadoop ${{ matrix.hadoop }}, Sedona ${{ matrix.sedona }}'
runs-on: ubuntu-22.04
strategy:
fail-fast: false
Expand All @@ -56,23 +57,6 @@ jobs:
spark-compat: '3.4'
sedona: 1.8.0
hadoop: 3.3.4
env:
JAVA_TOOL_OPTIONS: >-
-XX:+IgnoreUnrecognizedVMOptions
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
Expand Down Expand Up @@ -100,7 +84,8 @@ jobs:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- env:
- name: Test Scala Spark SQL Example
env:
SPARK_VERSION: ${{ matrix.spark }}
SPARK_LOCAL_IP: 127.0.0.1
SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
Expand All @@ -109,16 +94,28 @@ jobs:
run: |
cd examples/spark-sql
mvn versions:set -DnewVersion=${SEDONA_VERSION} -DgenerateBackupPoms=false
mvn clean install \
mvn clean test \
-Dspark.version=${SPARK_VERSION} \
-Dspark.compat.version=${SPARK_COMPAT_VERSION} \
-Dsedona.version=${SEDONA_VERSION} \
-Dhadoop.version=${HADOOP_VERSION}
java -jar target/sedona-spark-example-${SEDONA_VERSION}.jar
- env:
- name: Test Java Spark SQL Example
env:
SPARK_VERSION: ${{ matrix.spark }}
SPARK_LOCAL_IP: 127.0.0.1
SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
SEDONA_VERSION: ${{ matrix.sedona }}
HADOOP_VERSION: ${{ matrix.hadoop }}
run: |
cd examples/java-spark-sql
mvn versions:set -DnewVersion=${SEDONA_VERSION} -DgenerateBackupPoms=false
mvn clean test \
-Dspark.version=${SPARK_VERSION} \
-Dspark.compat.version=${SPARK_COMPAT_VERSION}
- name: Test Flink SQL Example
env:
SEDONA_VERSION: ${{ matrix.sedona }}
run: |
cd examples/flink-sql
mvn versions:set -DnewVersion=${SEDONA_VERSION} -DgenerateBackupPoms=false
mvn clean install
java -jar target/sedona-flink-example-${SEDONA_VERSION}.jar
mvn clean test
39 changes: 38 additions & 1 deletion examples/flink-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<geotools.scope>compile</geotools.scope>
<flink.version>1.19.0</flink.version>
<flink.scope>compile</flink.scope>
<flink.scope>provided</flink.scope>
<scala.compat.version>2.12</scala.compat.version>
<geotools.version>33.1</geotools.version>
<log4j.version>2.17.2</log4j.version>
Expand Down Expand Up @@ -247,6 +247,20 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<argLine>
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
Expand All @@ -266,6 +280,29 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.35.0</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.15.0</version>
</googleJavaFormat>
<licenseHeader>
<file>../../tools/maven/license-header.txt</file>
</licenseHeader>
</java>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
<phase>compile</phase>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
Expand Down
112 changes: 63 additions & 49 deletions examples/flink-sql/src/main/java/FlinkExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,70 +16,84 @@
* specific language governing permissions and limitations
* under the License.
*/
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.sedona.flink.SedonaFlinkRegistrator;
import org.apache.sedona.flink.expressions.Constructors;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class FlinkExample
{
static String[] pointColNames = {"geom_point", "name_point", "event_time", "proc_time"};

static String[] polygonColNames = {"geom_polygon", "name_polygon", "event_time", "proc_time"};
public class FlinkExample {
static String[] pointColNames = {"geom_point", "name_point", "event_time", "proc_time"};

public static void main(String[] args) {
int testDataSize = 10;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
SedonaFlinkRegistrator.registerType(env);
SedonaFlinkRegistrator.registerFunc(tableEnv);
static String[] polygonColNames = {"geom_polygon", "name_polygon", "event_time", "proc_time"};

// Create a fake WKT string table source
Table pointWktTable = Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames);
public static void main(String[] args) {
testS2SpatialJoin(10);
}

// Create a geometry column
Table pointTable = pointWktTable.select(
call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]),
$(pointColNames[1]));
public static void testS2SpatialJoin(int testDataSize) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
SedonaFlinkRegistrator.registerType(env);
SedonaFlinkRegistrator.registerFunc(tableEnv);

// Create S2CellID
pointTable = pointTable.select($(pointColNames[0]), $(pointColNames[1]),
call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
// Explode s2id array
tableEnv.createTemporaryView("pointTable", pointTable);
pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)");
// Create a fake WKT string table source
Table pointWktTable =
Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames);

// Create a geometry column
Table pointTable =
pointWktTable.select(
call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]), $(pointColNames[1]));

// Create a fake WKT string table source
Table polygonWktTable = Utils.createTextTable(env, tableEnv, Utils.createPolygonWKT(testDataSize), polygonColNames);
// Create a geometry column
Table polygonTable = polygonWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
$(polygonColNames[0])).as(polygonColNames[0]),
$(polygonColNames[1]));
// Create S2CellID
polygonTable = polygonTable.select($(polygonColNames[0]), $(polygonColNames[1]),
call("ST_S2CellIDs", $(polygonColNames[0]), 6).as("s2id_array"));
// Explode s2id array
tableEnv.createTemporaryView("polygonTable", polygonTable);
polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)");
// Create S2CellID
pointTable =
pointTable.select(
$(pointColNames[0]),
$(pointColNames[1]),
call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
// Explode s2id array
tableEnv.createTemporaryView("pointTable", pointTable);
pointTable =
tableEnv.sqlQuery(
"SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)");

// TODO: TableImpl.print() occurs EOF Exception due to https://issues.apache.org/jira/browse/FLINK-35406
// Use polygonTable.execute().print() when FLINK-35406 is fixed.
polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row));
// Create a fake WKT string table source
Table polygonWktTable =
Utils.createTextTable(env, tableEnv, Utils.createPolygonWKT(testDataSize), polygonColNames);
// Create a geometry column
Table polygonTable =
polygonWktTable.select(
call(Constructors.ST_GeomFromWKT.class.getSimpleName(), $(polygonColNames[0]))
.as(polygonColNames[0]),
$(polygonColNames[1]));
// Create S2CellID
polygonTable =
polygonTable.select(
$(polygonColNames[0]),
$(polygonColNames[1]),
call("ST_S2CellIDs", $(polygonColNames[0]), 6).as("s2id_array"));
// Explode s2id array
tableEnv.createTemporaryView("polygonTable", polygonTable);
polygonTable =
tableEnv.sqlQuery(
"SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)");

// Join two tables by their S2 ids
Table joinResult = pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
// Optional: remove false positives
joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point")));
joinResult.execute().collect().forEachRemaining(row -> System.out.println(row));
}
// TODO: TableImpl.print() occurs EOF Exception due to
// https://issues.apache.org/jira/browse/FLINK-35406
// Use polygonTable.execute().print() when FLINK-35406 is fixed.
polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row));

// Join two tables by their S2 ids
Table joinResult =
pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
// Optional: remove false positives
joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point")));
joinResult.execute().collect().forEachRemaining(row -> System.out.println(row));
}
}
Loading