-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-52223][CONNECT] Add SDP Spark Connect Protos #50942
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome. A few comments – mostly cosmetic.
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
// An unresolved relation that defines the dataset's flow. | ||
spark.connect.Relation plan = 4; | ||
|
||
// Default SQL configurations set when running this flow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: is the word "Default" relevant here? There's nothing more specific, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this related to the session in which the flow is defined? Is this an additional way to set configurations? I assume this takes precedence over what the session has configured?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, no need to say default - there is no more specific mechanism to set confs.
How is this related to the session in which the flow is defined? Is this an additional way to set configurations? I assume this takes precedence over what the session has configured?
For now, this is not supported. Users have to set confs directly in the table
/ flow
decorators for them to be applied to the pipeline.
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
|
||
message DefineSqlGraphElements { | ||
optional string dataflow_graph_id = 1; | ||
optional string sql_file_name = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something that occurred to me recently is that there could be SQL files with the same name in different subdirs. Should this be sql_file_path
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a filepath in implementation, actually. Let me confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this path pointing to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to file_path. We'll rename this in the implementation too.
Where is this path pointing to?
@hvanhovell this path is the local path to the SQL file. It's mostly used for disambiguation in our observability.
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
} | ||
|
||
enum DatasetType { | ||
DATASET_UNSPECIFIED = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever need to use DATASET_UNSPECIFIED?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, anything that sets this should throw an error. This just to ensure unset != MATERIALIZED_VIEW.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ever allowed to be unset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protobuf defaults to 0 for enums. It is better to have a recognizable unspecified configured at that value, instead of a value that has actual meaning.
map<string, string> sql_conf = 5; | ||
|
||
// If true, this flow will only be run once per execution. | ||
bool once = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Care to elaborate? Is this a synonym for this is batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This corresponds to Trigger.Once
in Spark - the flow runs once per update. This is similar to batch in triggered updates, but not in continuous ones (which we will add eventually).
|
||
// A response containing events emitted during the run of a pipeline. | ||
message PipelineEventsResult { | ||
repeated PipelineEvent events = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batching events should not be needed. gRPC server side streaming can return multiple 'events' at the same time, provided it can fit them in a single window (~30k).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair. But I think the repeated field adds more flexibility in general. We can group events logically, rather than just to avoid network latency.
repeated PipelineEvent events = 1; | ||
} | ||
|
||
message PipelineEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this also supposed to include errors? If so, it'd be nice to understand what has failed... In that case adding add flow/dataset name would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can the see the value in adding dataset and flow name. But two things:
- OTOH, we wanted to keep PipelineEvent's as a generic event bus rather than a structured logging format.
- It's possible an error happens that isn't scoped to a dataset/flow, making this field unpredictably empty.
But at the very least, the dataset/flow name will be in the error message.
What changes were proposed in this pull request?
Adds the Spark Connect API for Spark Declarative Pipelines: https://issues.apache.org/jira/browse/SPARK-51727.
This adds the following protos:
CreateDataflowGraph
creates a new graph in the registry.DefineDataset
andDefineFlow
register elements to the created graph. Datasets are the nodes of the dataflow graph, and are either tables or views, and flows are the edges connecting them.StartRun
starts a run, which is a single execution of a graph.StopRun
stops an existing run, whileDropPipeline
stops any current runs and drops the pipeline.It also adds the new
PipelineCommand
object to theExecutePlanRequest
and thePipelineCommand.Response
to theExecutePlanResponse
object.Why are the changes needed?
Base API of Spark Declarative Pipelines. Implementation coming in future PRs.
Does this PR introduce any user-facing change?
Yes - creates new proto API within Spark Connect.
How was this patch tested?
N/A
Was this patch authored or co-authored using generative AI tooling?
No.