-
Notifications
You must be signed in to change notification settings - Fork 27
Add RFC to add thrift support for task status/updateRequest/info #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
f523605
to
c4d50c3
Compare
#### For Polymorphic Types e.g. ConnectorSplit | ||
```java | ||
// Similarly, we register correct method for a give type using existing handle resolver | ||
protected AbstractTyped**Thrift**Module( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSON works by implicitly embedding an ID field that indicates to Jackson which subclass to use. It can do that because JSON is schema-less.
Thrift is schema-oriented. Where do we put the marker in the schema that tells our application code which subclass to use? Does the connector author have to manually add a field? If so, that just seems wrong--this should go outside of the connector class that must be serialized, since it's common to all connector datastructures.
If my understanding is correct, I would add this as a con below. Unlike JSON, it is not "automatic", and it would entail polluting connector classes with IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option 1 is simply to mirror what jackson does by adding a type information, which could be a string or a byte, at the beginning of the thrift payload. This is basically the same s what the option 2 does but in a similar code architecture to our existing code base, for example, we will have a AbstractTypedThriftModule.java similar to AbstractTypedJacksonModule.java where all the serde is being performed and we will also use the handle resolver to know in the runtime which connector is being used. And, no, the author of connector does not need to add a filed. The code in AbstractTypedThriftModule will add it automatically since it knows what the concrete class the connector is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't follow. Let's take an example, ConnectorSplit
:
"split": {
"connectorId": "hive",
"connectorSplit": {
"@type": "hive",
"nodeSelectionStrategy": "NO_PREFERENCE",
...
connectorSplit
corresponds to ConnectorSplit
, an interface; we configure Jackson to retrieve the @type
field to help determine which class to use to deserialize the payload.
If we pretend the JSON schema is Thrift, are you saying there would be a similar @type
field embedded in the Thrift payload? If so, would you change Drift to look for that? If not, would you use the outer connectorId
in the outer object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and no.
For the yes part, say, we have a hive split, which has registered its own serde. Within the custom codec, we will do:
TMemoryBuffer transport = new TMemoryBuffer(1024);
TProtocolWriter writer = new TBinaryProtocol(transport);
// write the connector id/type
writer.writeStructBegin(new TStruct("Split"));
writer.writeFieldBegin(new TField("connectorId", TType.String, (short) 2));
writer.writeString("hive");
writer.writeFieldEnd();
// write the real data
serializerRegistry.getCodec(HiveSplit.class).write(aHiveSplitObject, writer);
writer.writeStructEnd();
For the no part, we will use drift for the annotation and also use it to serde all primitive types. But for ploymophic types like splits, we will use the custom codec where we can control the implementation. Let me know if this makes sense to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume this is a simplified example of what the runtime would do when it encounters the ConnectorSplit data structure (it seems there a Thrift wrapper that holds the ID, and we use that ID to deserialize a binary paylod). If so, could we replace
serializerRegistry.getCodec(HiveSplit.class).write(aHiveSplitObject, writer);
with something like this
writer.writeBinary(ByteBuffer.wrap(serializerRegistry.getCodec(HiveSplit.class).serialize(aHiveSplitObject)));
... where the getCodec
method is just a generic custom serializer that lives in the SPI? Serializable
is already agnostic to Drift, so I'm guessing this is just an editorial mistake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes to "getCodec method is just a generic custom serializer that lives in the SPI". I should have called it "getSerializer" which is to retrieve the serializer based on the connector id.
@Override | ||
public Connector create(...) { | ||
// Register serializers | ||
serializerRegistry.registerSerializer("hive", new HiveSplitThriftSerializer()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In HiveConnectorFactory
we know already the connector ID (it's in getName
), so we shouldn't need to specify it again as done here, and we should make it hard to miss implementing a serializer (and, additionally, make it obvious when a new one is required upon SPI upgrade). Could the SPI supply an interface of all serializable data structures, which must all be implemented in order for serialization to function properly, and this maps automatically to the already well-known connector ID in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That is the goal of it. Anything that we would like to support custom serde for should implement this "Serializable" interface (I just change the name to a more general one). I was focusing too much on the split and end up calling it ConnectorSplitSerializer. But in reality, yes, we can extend this to other data by adding this interface in spi and ask others to implement it.
Regarding the connect id in HiveConnectorFactory, yes, we should use the getName. I emphasized the "hive" to show that we will be able to register different serde for different connector.
|
||
|
||
// Use a custom codec for ConnectorSplit | ||
public class ConnectorSplitCodec implements ThriftCodec<ConnectorSplit> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make the Codec
serialization-agnostic? Any particular need to tie this to Thrift?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong @tdcmeehan . I believe even with option 2 where we can have any implementation for the serde as long as they can convert between an object and a byte array, we still need some kind of system to indicate the class hierarchy to indicate the relationship among these classes and we need this system to link between a class and its serde method. Here, we use drift annotation to do so and we need those custom codec to be subclass of thriftcodec so that we can inject them into the thirft serde system and let the whole flow work. But within the serde itsself where the byte array is being generated, it does not really need to be a thrift binary and it could be any format, as long as the serialization and deserialization method agree with each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misread the intent of this class and realize now this is a core class, not a plugin class.
But I have a different question now. Suppose we have two different connectors registered, Hive and Iceberg. How does the ConnectorSplitCodec
know which serializer to pick in the ConnectorSplitSerializerRegistry
? It must know it needs the Hive
one, but how does it know that using just information found in ConnectorSplit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My apology for the name. Too many repeated keywords. It should be SplitCodec here, not ConnectorSplitCodec. Split class has the connector id in it and within its custom codec, a.k.a splitcodec, it can use the connector id to retrieve the serde from the registry.
But still, in the "Serializable" interface, we can enforce a "getType" method to ask all types of split to report their own types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, it seems we serialize the whole Split
as JSON in SplitCodec
. Wouldn't this need to be changed to serialize just the ConnectorSplit
? Could you please update the example to include some pseudocode on what exactly would happen? Is it more or less this?
public class SplitCodec implements ThriftCodec<Split>
readData:
read connector id
look up transaction handle from deserializers and deserialize binary payload
look up connector split from deserializers and deserialize binary payload
read lifespan
read split context
...etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. We are wrapping all polymorphic type as a json field within a thrift payload so that we can migrate all primitive types to thrift first. This gives us the capability to move forward in a more manageable way.
Just updated the rfc.
|
||
// With in the byte array from serialization, we will find | ||
* String connectId; // hive or other connector | ||
* private final String type; // Optional, can be json, or thrift |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't the connector already know which one it is? Why must the type of serialization be added to the payload, why can't a properly configured connector just implicitly all use the same serialization method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I was thinking to support multiple format even for a single connector but later on realized that might be over-engineering. Let me clean this up. Thanks!
51ef4df
to
abfc002
Compare
// With in the byte array from serialization, we will find | ||
* String connectId; // hive or other connector | ||
* private final byte[] data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fair to say this would be a Thrift struct in itself? If not, how would you transparently read and write the connectorId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, yes, it will be a thrift for now because the company wants us to use thrift. But, since we will have the control for the implementation of the serde within custom codec, it does not need to be thrift but could be any binary protocol as long as it can convert between the object and a byte array.
|
||
|
||
// Use a custom codec for ConnectorSplit | ||
public class ConnectorSplitCodec implements ThriftCodec<ConnectorSplit> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misread the intent of this class and realize now this is a core class, not a plugin class.
But I have a different question now. Suppose we have two different connectors registered, Hive and Iceberg. How does the ConnectorSplitCodec
know which serializer to pick in the ConnectorSplitSerializerRegistry
? It must know it needs the Hive
one, but how does it know that using just information found in ConnectorSplit
?
@Override | ||
public Connector create(...) { | ||
// Register serializers | ||
serializerRegistry.registerSerializer(getName(), new HiveSplitThriftSerializer()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we know upfront which serializers are required, why does the connector need access to serializerRegistry
--why can't the core runtime do this using all known provided serializers from the connector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that the connector itself should know everything about its own split and it is the connector's job to register the serde for its split. From the core runtime's point of view, it only need to know that any split will be implementing the "Serializable" interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is, couldn't we make it easier to write a connector by having the runtime look up the serdes from a well known place (perhaps a method on ConnectorMetadata
), instead of us having to inject serializerRegistry
into the connector and relying on the connector to register each and every serde?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I found that there is a ConnectorTypeSerdeManager.java which I can use as a registry. I see Connector Manager is adding connector specific serde into it and task resource is retrieving from it. I guess I can let split codec to retrieve serde method for a given connector split as well. Thanks for the suggestion.
04891b3
to
724242f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any known preference within Meta or elsewhere for option 1?
Not really. We prefer Option 2. |
2fb38be
to
5d3e86c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we plan to migrate the plan fragment as well, or keep that as JSON?
2. Maintain backward compatibility with existing JSON serialization | ||
3. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers | ||
* The final IDL for all thrift structs needed for taskStatus, taskInfo, and taskUpdateRequest can be automatically generated by building the "presto-thrift-spec" module in presto repo. This module will also be automatically module while building presto. | ||
* For cpp worker, there will be an extra step to generate some utility codes by using presto-native-execution/presto_cpp/main/thrift/Makefile and run a `make` command |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is linked to the main make
command, or is it a separate command required post-build?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a separate make
command to invoke that Makefile. This is necessary even for now if a protocol (between java coordinator and cpp worker) change is needed.
IMO I think we can just remove option 1 altogether from this RFC. |
We plan to keep it as json for now since it will only be serde once. |
I moved Option 1 down to the "other approach considered" section. |
|
||
``` | ||
|
||
### Preferred Option: Pluggable Serde for Polymorphic Types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shangm2 : Would be great to see an example of what the native code corresponding to this would look like. Can you add some code-sample for that ?
Add RFC to add thrift support for task status/updateRequest/info