Skip to content

Add RFC to add thrift support for task status/updateRequest/info #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

shangm2
Copy link

@shangm2 shangm2 commented May 15, 2025

Add RFC to add thrift support for task status/updateRequest/info

@prestodb-ci prestodb-ci added the from:Meta PRs from Meta label May 15, 2025
@shangm2 shangm2 changed the title Add RFC to add thrift support for task status/update/info Add RFC to add thrift support for task status/updateRequest/info May 15, 2025
@shangm2 shangm2 force-pushed the thrift branch 2 times, most recently from f523605 to c4d50c3 Compare May 15, 2025 22:49
#### For Polymorphic Types e.g. ConnectorSplit
```java
// Similarly, we register correct method for a give type using existing handle resolver
protected AbstractTyped**Thrift**Module(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON works by implicitly embedding an ID field that indicates to Jackson which subclass to use. It can do that because JSON is schema-less.

Thrift is schema-oriented. Where do we put the marker in the schema that tells our application code which subclass to use? Does the connector author have to manually add a field? If so, that just seems wrong--this should go outside of the connector class that must be serialized, since it's common to all connector datastructures.

If my understanding is correct, I would add this as a con below. Unlike JSON, it is not "automatic", and it would entail polluting connector classes with IDs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 is simply to mirror what jackson does by adding a type information, which could be a string or a byte, at the beginning of the thrift payload. This is basically the same s what the option 2 does but in a similar code architecture to our existing code base, for example, we will have a AbstractTypedThriftModule.java similar to AbstractTypedJacksonModule.java where all the serde is being performed and we will also use the handle resolver to know in the runtime which connector is being used. And, no, the author of connector does not need to add a filed. The code in AbstractTypedThriftModule will add it automatically since it knows what the concrete class the connector is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't follow. Let's take an example, ConnectorSplit:

          "split": {
            "connectorId": "hive",
            "connectorSplit": {
              "@type": "hive",
              "nodeSelectionStrategy": "NO_PREFERENCE",
        ...

connectorSplit corresponds to ConnectorSplit, an interface; we configure Jackson to retrieve the @type field to help determine which class to use to deserialize the payload.

If we pretend the JSON schema is Thrift, are you saying there would be a similar @type field embedded in the Thrift payload? If so, would you change Drift to look for that? If not, would you use the outer connectorId in the outer object?

Copy link
Author

@shangm2 shangm2 May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and no.
For the yes part, say, we have a hive split, which has registered its own serde. Within the custom codec, we will do:

    TMemoryBuffer transport = new TMemoryBuffer(1024);
    TProtocolWriter writer = new TBinaryProtocol(transport);
    
     // write the connector id/type
    writer.writeStructBegin(new TStruct("Split"));
    writer.writeFieldBegin(new TField("connectorId", TType.String, (short) 2));
    writer.writeString("hive");
    writer.writeFieldEnd();

    // write the real data
    serializerRegistry.getCodec(HiveSplit.class).write(aHiveSplitObject, writer);
    writer.writeStructEnd();

For the no part, we will use drift for the annotation and also use it to serde all primitive types. But for ploymophic types like splits, we will use the custom codec where we can control the implementation. Let me know if this makes sense to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this is a simplified example of what the runtime would do when it encounters the ConnectorSplit data structure (it seems there a Thrift wrapper that holds the ID, and we use that ID to deserialize a binary paylod). If so, could we replace

    serializerRegistry.getCodec(HiveSplit.class).write(aHiveSplitObject, writer);

with something like this

writer.writeBinary(ByteBuffer.wrap(serializerRegistry.getCodec(HiveSplit.class).serialize(aHiveSplitObject)));

... where the getCodec method is just a generic custom serializer that lives in the SPI? Serializable is already agnostic to Drift, so I'm guessing this is just an editorial mistake.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes to "getCodec method is just a generic custom serializer that lives in the SPI". I should have called it "getSerializer" which is to retrieve the serializer based on the connector id.

@Override
public Connector create(...) {
// Register serializers
serializerRegistry.registerSerializer("hive", new HiveSplitThriftSerializer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In HiveConnectorFactory we know already the connector ID (it's in getName), so we shouldn't need to specify it again as done here, and we should make it hard to miss implementing a serializer (and, additionally, make it obvious when a new one is required upon SPI upgrade). Could the SPI supply an interface of all serializable data structures, which must all be implemented in order for serialization to function properly, and this maps automatically to the already well-known connector ID in this class?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That is the goal of it. Anything that we would like to support custom serde for should implement this "Serializable" interface (I just change the name to a more general one). I was focusing too much on the split and end up calling it ConnectorSplitSerializer. But in reality, yes, we can extend this to other data by adding this interface in spi and ask others to implement it.

Regarding the connect id in HiveConnectorFactory, yes, we should use the getName. I emphasized the "hive" to show that we will be able to register different serde for different connector.



// Use a custom codec for ConnectorSplit
public class ConnectorSplitCodec implements ThriftCodec<ConnectorSplit>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make the Codec serialization-agnostic? Any particular need to tie this to Thrift?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong @tdcmeehan . I believe even with option 2 where we can have any implementation for the serde as long as they can convert between an object and a byte array, we still need some kind of system to indicate the class hierarchy to indicate the relationship among these classes and we need this system to link between a class and its serde method. Here, we use drift annotation to do so and we need those custom codec to be subclass of thriftcodec so that we can inject them into the thirft serde system and let the whole flow work. But within the serde itsself where the byte array is being generated, it does not really need to be a thrift binary and it could be any format, as long as the serialization and deserialization method agree with each other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misread the intent of this class and realize now this is a core class, not a plugin class.

But I have a different question now. Suppose we have two different connectors registered, Hive and Iceberg. How does the ConnectorSplitCodec know which serializer to pick in the ConnectorSplitSerializerRegistry? It must know it needs the Hive one, but how does it know that using just information found in ConnectorSplit?

Copy link
Author

@shangm2 shangm2 May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My apology for the name. Too many repeated keywords. It should be SplitCodec here, not ConnectorSplitCodec. Split class has the connector id in it and within its custom codec, a.k.a splitcodec, it can use the connector id to retrieve the serde from the registry.

But still, in the "Serializable" interface, we can enforce a "getType" method to ask all types of split to report their own types.

Copy link
Contributor

@tdcmeehan tdcmeehan May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it seems we serialize the whole Split as JSON in SplitCodec. Wouldn't this need to be changed to serialize just the ConnectorSplit? Could you please update the example to include some pseudocode on what exactly would happen? Is it more or less this?

public class SplitCodec implements ThriftCodec<Split>
readData:
  read connector id
  look up transaction handle from deserializers and deserialize binary payload
  look up connector split from deserializers and deserialize binary payload 
  read lifespan
  read split context

...etc...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We are wrapping all polymorphic type as a json field within a thrift payload so that we can migrate all primitive types to thrift first. This gives us the capability to move forward in a more manageable way.

Just updated the rfc.


// With in the byte array from serialization, we will find
* String connectId; // hive or other connector
* private final String type; // Optional, can be json, or thrift
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the connector already know which one it is? Why must the type of serialization be added to the payload, why can't a properly configured connector just implicitly all use the same serialization method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I was thinking to support multiple format even for a single connector but later on realized that might be over-engineering. Let me clean this up. Thanks!

@shangm2 shangm2 force-pushed the thrift branch 3 times, most recently from 51ef4df to abfc002 Compare May 19, 2025 16:39
Comment on lines 244 to 278
// With in the byte array from serialization, we will find
* String connectId; // hive or other connector
* private final byte[] data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fair to say this would be a Thrift struct in itself? If not, how would you transparently read and write the connectorId?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, yes, it will be a thrift for now because the company wants us to use thrift. But, since we will have the control for the implementation of the serde within custom codec, it does not need to be thrift but could be any binary protocol as long as it can convert between the object and a byte array.



// Use a custom codec for ConnectorSplit
public class ConnectorSplitCodec implements ThriftCodec<ConnectorSplit>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misread the intent of this class and realize now this is a core class, not a plugin class.

But I have a different question now. Suppose we have two different connectors registered, Hive and Iceberg. How does the ConnectorSplitCodec know which serializer to pick in the ConnectorSplitSerializerRegistry? It must know it needs the Hive one, but how does it know that using just information found in ConnectorSplit?

@Override
public Connector create(...) {
// Register serializers
serializerRegistry.registerSerializer(getName(), new HiveSplitThriftSerializer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we know upfront which serializers are required, why does the connector need access to serializerRegistry--why can't the core runtime do this using all known provided serializers from the connector?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that the connector itself should know everything about its own split and it is the connector's job to register the serde for its split. From the core runtime's point of view, it only need to know that any split will be implementing the "Serializable" interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is, couldn't we make it easier to write a connector by having the runtime look up the serdes from a well known place (perhaps a method on ConnectorMetadata), instead of us having to inject serializerRegistry into the connector and relying on the connector to register each and every serde?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I found that there is a ConnectorTypeSerdeManager.java which I can use as a registry. I see Connector Manager is adding connector specific serde into it and task resource is retrieving from it. I guess I can let split codec to retrieve serde method for a given connector split as well. Thanks for the suggestion.

@shangm2 shangm2 force-pushed the thrift branch 4 times, most recently from 04891b3 to 724242f Compare May 20, 2025 15:50
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any known preference within Meta or elsewhere for option 1?

@shangm2
Copy link
Author

shangm2 commented May 22, 2025

Is there any known preference within Meta or elsewhere for option 1?

Not really. We prefer Option 2.

@shangm2 shangm2 force-pushed the thrift branch 2 times, most recently from 2fb38be to 5d3e86c Compare May 22, 2025 18:43
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to migrate the plan fragment as well, or keep that as JSON?

2. Maintain backward compatibility with existing JSON serialization
3. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers
* The final IDL for all thrift structs needed for taskStatus, taskInfo, and taskUpdateRequest can be automatically generated by building the "presto-thrift-spec" module in presto repo. This module will also be automatically module while building presto.
* For cpp worker, there will be an extra step to generate some utility codes by using presto-native-execution/presto_cpp/main/thrift/Makefile and run a `make` command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is linked to the main make command, or is it a separate command required post-build?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a separate make command to invoke that Makefile. This is necessary even for now if a protocol (between java coordinator and cpp worker) change is needed.

@tdcmeehan
Copy link
Contributor

IMO I think we can just remove option 1 altogether from this RFC.

@shangm2
Copy link
Author

shangm2 commented May 27, 2025

Do we plan to migrate the plan fragment as well, or keep that as JSON?

We plan to keep it as json for now since it will only be serde once.

@shangm2
Copy link
Author

shangm2 commented May 27, 2025

IMO I think we can just remove option 1 altogether from this RFC.

I moved Option 1 down to the "other approach considered" section.


```

### Preferred Option: Pluggable Serde for Polymorphic Types

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shangm2 : Would be great to see an example of what the native code corresponding to this would look like. Can you add some code-sample for that ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
from:Meta PRs from Meta
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants