|
16 | 16 | * specific language governing permissions and limitations |
17 | 17 | * under the License. |
18 | 18 | */ |
| 19 | +import static org.apache.flink.table.api.Expressions.$; |
| 20 | +import static org.apache.flink.table.api.Expressions.call; |
19 | 21 |
|
20 | 22 | import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
21 | 23 | import org.apache.flink.table.api.EnvironmentSettings; |
22 | 24 | import org.apache.flink.table.api.Table; |
23 | 25 | import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; |
24 | | - |
25 | 26 | import org.apache.sedona.flink.SedonaFlinkRegistrator; |
26 | 27 | import org.apache.sedona.flink.expressions.Constructors; |
27 | 28 |
|
28 | | -import static org.apache.flink.table.api.Expressions.$; |
29 | | -import static org.apache.flink.table.api.Expressions.call; |
30 | | - |
31 | | -public class FlinkExample |
32 | | -{ |
33 | | - static String[] pointColNames = {"geom_point", "name_point", "event_time", "proc_time"}; |
34 | | - |
35 | | - static String[] polygonColNames = {"geom_polygon", "name_polygon", "event_time", "proc_time"}; |
| 29 | +public class FlinkExample { |
| 30 | + static String[] pointColNames = {"geom_point", "name_point", "event_time", "proc_time"}; |
36 | 31 |
|
37 | | - public static void main(String[] args) { |
38 | | - int testDataSize = 10; |
39 | | - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
40 | | - EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); |
41 | | - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); |
42 | | - SedonaFlinkRegistrator.registerType(env); |
43 | | - SedonaFlinkRegistrator.registerFunc(tableEnv); |
| 32 | + static String[] polygonColNames = {"geom_polygon", "name_polygon", "event_time", "proc_time"}; |
44 | 33 |
|
45 | | - // Create a fake WKT string table source |
46 | | - Table pointWktTable = Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames); |
| 34 | + public static void main(String[] args) { |
| 35 | + testS2SpatialJoin(10); |
| 36 | + } |
47 | 37 |
|
48 | | - // Create a geometry column |
49 | | - Table pointTable = pointWktTable.select( |
50 | | - call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]), |
51 | | - $(pointColNames[1])); |
| 38 | + public static void testS2SpatialJoin(int testDataSize) { |
| 39 | + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| 40 | + EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); |
| 41 | + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); |
| 42 | + SedonaFlinkRegistrator.registerType(env); |
| 43 | + SedonaFlinkRegistrator.registerFunc(tableEnv); |
52 | 44 |
|
53 | | - // Create S2CellID |
54 | | - pointTable = pointTable.select($(pointColNames[0]), $(pointColNames[1]), |
55 | | - call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array")); |
56 | | - // Explode s2id array |
57 | | - tableEnv.createTemporaryView("pointTable", pointTable); |
58 | | - pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)"); |
| 45 | + // Create a fake WKT string table source |
| 46 | + Table pointWktTable = |
| 47 | + Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames); |
59 | 48 |
|
| 49 | + // Create a geometry column |
| 50 | + Table pointTable = |
| 51 | + pointWktTable.select( |
| 52 | + call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]), $(pointColNames[1])); |
60 | 53 |
|
61 | | - // Create a fake WKT string table source |
62 | | - Table polygonWktTable = Utils.createTextTable(env, tableEnv, Utils.createPolygonWKT(testDataSize), polygonColNames); |
63 | | - // Create a geometry column |
64 | | - Table polygonTable = polygonWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(), |
65 | | - $(polygonColNames[0])).as(polygonColNames[0]), |
66 | | - $(polygonColNames[1])); |
67 | | - // Create S2CellID |
68 | | - polygonTable = polygonTable.select($(polygonColNames[0]), $(polygonColNames[1]), |
69 | | - call("ST_S2CellIDs", $(polygonColNames[0]), 6).as("s2id_array")); |
70 | | - // Explode s2id array |
71 | | - tableEnv.createTemporaryView("polygonTable", polygonTable); |
72 | | - polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)"); |
| 54 | + // Create S2CellID |
| 55 | + pointTable = |
| 56 | + pointTable.select( |
| 57 | + $(pointColNames[0]), |
| 58 | + $(pointColNames[1]), |
| 59 | + call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array")); |
| 60 | + // Explode s2id array |
| 61 | + tableEnv.createTemporaryView("pointTable", pointTable); |
| 62 | + pointTable = |
| 63 | + tableEnv.sqlQuery( |
| 64 | + "SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)"); |
73 | 65 |
|
74 | | - // TODO: TableImpl.print() occurs EOF Exception due to https://issues.apache.org/jira/browse/FLINK-35406 |
75 | | - // Use polygonTable.execute().print() when FLINK-35406 is fixed. |
76 | | - polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row)); |
| 66 | + // Create a fake WKT string table source |
| 67 | + Table polygonWktTable = |
| 68 | + Utils.createTextTable(env, tableEnv, Utils.createPolygonWKT(testDataSize), polygonColNames); |
| 69 | + // Create a geometry column |
| 70 | + Table polygonTable = |
| 71 | + polygonWktTable.select( |
| 72 | + call(Constructors.ST_GeomFromWKT.class.getSimpleName(), $(polygonColNames[0])) |
| 73 | + .as(polygonColNames[0]), |
| 74 | + $(polygonColNames[1])); |
| 75 | + // Create S2CellID |
| 76 | + polygonTable = |
| 77 | + polygonTable.select( |
| 78 | + $(polygonColNames[0]), |
| 79 | + $(polygonColNames[1]), |
| 80 | + call("ST_S2CellIDs", $(polygonColNames[0]), 6).as("s2id_array")); |
| 81 | + // Explode s2id array |
| 82 | + tableEnv.createTemporaryView("polygonTable", polygonTable); |
| 83 | + polygonTable = |
| 84 | + tableEnv.sqlQuery( |
| 85 | + "SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)"); |
77 | 86 |
|
78 | | - // Join two tables by their S2 ids |
79 | | - Table joinResult = pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon"))); |
80 | | - // Optional: remove false positives |
81 | | - joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point"))); |
82 | | - joinResult.execute().collect().forEachRemaining(row -> System.out.println(row)); |
83 | | - } |
| 87 | + // TODO: TableImpl.print() occurs EOF Exception due to |
| 88 | + // https://issues.apache.org/jira/browse/FLINK-35406 |
| 89 | + // Use polygonTable.execute().print() when FLINK-35406 is fixed. |
| 90 | + polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row)); |
84 | 91 |
|
| 92 | + // Join two tables by their S2 ids |
| 93 | + Table joinResult = |
| 94 | + pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon"))); |
| 95 | + // Optional: remove false positives |
| 96 | + joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point"))); |
| 97 | + joinResult.execute().collect().forEachRemaining(row -> System.out.println(row)); |
| 98 | + } |
85 | 99 | } |
0 commit comments