Skip to content

Conversation

@radekaadek
Copy link

Did you read the Contributor Guide?

Is this PR related to a ticket?

What changes were proposed in this PR?

Add a Sedona Flink module for people who use the Flink SQL Gateway.

How was this patch tested?

Did this PR include necessary documentation updates?

@radekaadek
Copy link
Author

radekaadek commented Oct 25, 2025

I have tested the module manually and it loads in Flink but the functions are not usable.

I can call LOAD MODULE sedona and after calling SHOW FUNCTIONS I get a list that shows all of them but after trying to actually call one of them, for example:

SELECT ST_Degrees(2.0)

I get

org.apache.flink.table.api.ValidationException: SQL validation failed. SQL validation failed. From line 1, column 26 to line 1, column 56: No match found for function signature ST_Degrees(<NUMERIC>)

When i explicitly cast the number to a double or an integer the same issue appears.
Do you have any ideas on what might be going on @Imbruced?

EDIT: I fixed the issue, it was linked to incorrect casing and I made everything lowercase.

@radekaadek radekaadek force-pushed the add-flink-module branch 2 times, most recently from dd77708 to 7c8bee4 Compare October 25, 2025 18:42
@radekaadek
Copy link
Author

radekaadek commented Oct 25, 2025

When I call SELECT ST_Point(2,2)
It produces ugly jobs where it casts the 2s from NUMERIC to DOUBLE:
SELECT ST_Point(CAST(2 AS DOUBLE), CAST(2 AS DOUBLE))

Other than that, I'll work on the types tomorrow.

@radekaadek
Copy link
Author

I have been looking through the code and it seems to me that either there was a mistake or I'm missing something because all of the functions in flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java have this:
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)

For example:

  public static class ST_Centroid extends ScalarFunction {
    @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
    public Geometry eval(
        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
            Object o) {
      Geometry geom = (Geometry) o;
      return org.apache.sedona.common.Functions.getCentroid(geom);
    }
  }

The documentation of this DataTypeHint states that this way of defining hints leads to using the Flink's default serializer:

 * <p>{@code @DataTypeHint(value = "RAW", bridgedTo = MyCustomClass.class)} defines a RAW data type
 * with Flink's default serializer for class {@code MyCustomClass}.

Making the functions use custom serializers requires passing a rawSerializer:

 * <p>{@code @DataTypeHint(value = "RAW", rawSerializer = MyCustomSerializer.class)} defines a RAW
 * data type with a custom serializer class.

What do you think about this @Imbruced?

@Imbruced
Copy link
Member

@radekaadek, thanks for the research 🙇 Are you able to verify if it applies correctly with your changes?
cc: @jiayuasu (iirc, you have been workign on the Flink java)

@jiayuasu
Copy link
Member

@radekaadek I think the only reason why I did that is because otherwise no way to register those functions. If there is a clear path now, please feel free to use the customized serializer. Please use Sedona's own geom serializer if this is the case: https://github.com/apache/sedona/tree/master/common/src/main/java/org/apache/sedona/common/geometrySerde

Our serializer has way better performance than the default Java / Kyro serializer

@jiayuasu
Copy link
Member

Well, looks like the current code already uses our serializer

@radekaadek
Copy link
Author

I guess the reason that it works now is because Flink automatically infers that a serializer that was passed in when that type was created.
I suppose I would need to change all of the functions to use the custom serializer in order to get it to work with the module.

Can you tell me how you were able to verify that the code actually uses custom serializers @jiayuasu ?
I had been trying to put print statements inside of the geometry serialization class you mentioned but they never triggered when I was testing my code.

I don't know if I'll be able to serialize the index as I don't see it called in a function anywhere but I'll try to instantiate it's type and serializer when the module is loaded.

The types in the module resolve to this RAW type where a custom serializer can be specified and the types themselves should already be handled correctly by Flink.

I'll write some tests and documentation for the module once I'll have some time on my hands.

@radekaadek
Copy link
Author

I have added some tests for the module and made the functions use the custom serializer.
I still don't know how to verify that the functions in the module are actually using the custom serializer, so I would appreciate it if someone could help me with that.

I also noticed that all tests are initialized with the initialize method:

  static void initialize(boolean enableWebUI) {
    env =
        enableWebUI
            ? StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
            : StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    tableEnv = SedonaContext.create(env, StreamTableEnvironment.create(env, settings));
  }

It occurred to me that the module could reuse most, if not all, of the test cases if this method were modified to something like this:

  static void initialize(boolean enableWebUI) {
    env =
        enableWebUI
            ? StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
            : StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    tableEnv = StreamTableEnvironment.create(env, settings);
    tableEnv.executeSql("LOAD MODULE sedona");
  }

This would require some refactoring, but let me know what you think.

@radekaadek radekaadek marked this pull request as ready for review November 1, 2025 13:42
@radekaadek radekaadek requested a review from jiayuasu as a code owner November 1, 2025 13:42
@jiayuasu
Copy link
Member

jiayuasu commented Nov 2, 2025

@radekaadek regarding logs, did you try to use log4j in Flink to print logs?

@radekaadek
Copy link
Author

@jiayuasu I solved the issue by making the serializer make some files on my machine, and it did, but I don't know how you would like see it actually being tested in unit tests.
I also found that I had missed the Geometry[] type and have added it too.

I have not noticed an error appeared in the CI that looks like this:

testTableToDS(org.apache.sedona.flink.AdapterTest)  Time elapsed: 0.889 sec  <<< ERROR!
org.apache.flink.table.api.ValidationException: Data type 'RAW('java.lang.Object', '...')' does not support an output conversion to class 'org.locationtech.jts.geom.Geometry'.

and I don't know if where it's coming from.

After implementing the Geometry[] serializer I also noticed more errors:

testDump(org.apache.sedona.flink.FunctionTest)  Time elapsed: 0.006 sec  <<< ERROR!
org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'org.apache.sedona.flink.expressions.Functions$ST_Dump'. Please check for implementation mistakes and/or provide a corresponding hint.

The rest of the tests that are currently failing all seem to be connected to the Geometry[] type:

Tests in error:
  AdapterTest.testTableToDS:51 » Validation Data type 'RAW('java.lang.Object', '...
  FunctionTest.testCollectWithArray:277->TestBase.first:464 » Runtime Failed to ...
  FunctionTest.testDump:378 » Validation Could not extract a valid type inferenc...
  FunctionTest.testDumpPoints:387 » Validation Could not extract a valid type in...
  FunctionTest.testH3ToGeom:2088 » Validation SQL validation failed. An error oc...
  FunctionTest.testLineSegments:1632 » Validation Could not extract a valid type...
  FunctionTest.testMakeLine:1668->TestBase.first:464 » Runtime Failed to fetch n...
  FunctionTest.testMakePolygonWithHoles:1722->TestBase.first:464 » Runtime Faile...
  FunctionTest.testS2ToGeom:2009 » Validation SQL validation failed. An error oc...
  FunctionTest.testSubdivide:1936 » Validation Could not extract a valid type in...
  FunctionTest.testUnionArrayVariant:532->TestBase.first:464 » Runtime Failed to...

The Geometry[] class is never added in ~/prg/sedona/flink/src/main/java/org/apache/sedona/flink/SedonaContext.java and it's a different type from GeometryCollection, and this is probably the reason why I'm seeing all of these errors. Was it ever verified that Geometry[] is also properly registered and serialized?

@jiayuasu
Copy link
Member

jiayuasu commented Nov 4, 2025

sorry I am traveling this week so my response might be slow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Registering Sedona types and SQL functions on a standalone Flink instance

3 participants