Skip to content

[SPARK-52223][CONNECT] Add SDP Spark Connect Protos #50942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

aakash-db
Copy link

@aakash-db aakash-db commented May 19, 2025

What changes were proposed in this pull request?

Adds the Spark Connect API for Spark Declarative Pipelines: https://issues.apache.org/jira/browse/SPARK-51727.

This adds the following protos:

  1. CreateDataflowGraph creates a new graph in the registry.
  2. DefineDataset and DefineFlow register elements to the created graph. Datasets are the nodes of the dataflow graph, and are either tables or views, and flows are the edges connecting them.
  3. StartRun starts a run, which is a single execution of a graph.
  4. StopRun stops an existing run, while DropPipeline stops any current runs and drops the pipeline.

It also adds the new PipelineCommand object to the ExecutePlanRequest and the PipelineCommand.Response to the ExecutePlanResponse object.

Why are the changes needed?

Base API of Spark Declarative Pipelines. Implementation coming in future PRs.

Does this PR introduce any user-facing change?

Yes - creates new proto API within Spark Connect.

How was this patch tested?

N/A

Was this patch authored or co-authored using generative AI tooling?

No.

@aakash-db aakash-db changed the title [WIP] Add SDP Spark Connect Protos [SPARK-52223] Add SDP Spark Connect Protos May 19, 2025
Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. A few comments – mostly cosmetic.

// An unresolved relation that defines the dataset's flow.
spark.connect.Relation plan = 4;

// Default SQL configurations set when running this flow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: is the word "Default" relevant here? There's nothing more specific, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this related to the session in which the flow is defined? Is this an additional way to set configurations? I assume this takes precedence over what the session has configured?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, no need to say default - there is no more specific mechanism to set confs.

How is this related to the session in which the flow is defined? Is this an additional way to set configurations? I assume this takes precedence over what the session has configured?

For now, this is not supported. Users have to set confs directly in the table / flow decorators for them to be applied to the pipeline.


message DefineSqlGraphElements {
optional string dataflow_graph_id = 1;
optional string sql_file_name = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that occurred to me recently is that there could be SQL files with the same name in different subdirs. Should this be sql_file_path?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a filepath in implementation, actually. Let me confirm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this path pointing to?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to file_path. We'll rename this in the implementation too.

Where is this path pointing to?

@hvanhovell this path is the local path to the SQL file. It's mostly used for disambiguation in our observability.

}

enum DatasetType {
DATASET_UNSPECIFIED = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever need to use DATASET_UNSPECIFIED?

Copy link
Author

@aakash-db aakash-db May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, anything that sets this should throw an error. This just to ensure unset != MATERIALIZED_VIEW.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ever allowed to be unset?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protobuf defaults to 0 for enums. It is better to have a recognizable unspecified configured at that value, instead of a value that has actual meaning.

@HyukjinKwon HyukjinKwon changed the title [SPARK-52223] Add SDP Spark Connect Protos [SPARK-52223][CONNECT] Add SDP Spark Connect Protos May 20, 2025
map<string, string> sql_conf = 5;

// If true, this flow will only be run once per execution.
bool once = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Care to elaborate? Is this a synonym for this is batch?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This corresponds to Trigger.Once in Spark - the flow runs once per update. This is similar to batch in triggered updates, but not in continuous ones (which we will add eventually).


// A response containing events emitted during the run of a pipeline.
message PipelineEventsResult {
repeated PipelineEvent events = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batching events should not be needed. gRPC server side streaming can return multiple 'events' at the same time, provided it can fit them in a single window (~30k).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. But I think the repeated field adds more flexibility in general. We can group events logically, rather than just to avoid network latency.

repeated PipelineEvent events = 1;
}

message PipelineEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also supposed to include errors? If so, it'd be nice to understand what has failed... In that case adding add flow/dataset name would be nice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can the see the value in adding dataset and flow name. But two things:

  1. OTOH, we wanted to keep PipelineEvent's as a generic event bus rather than a structured logging format.
  2. It's possible an error happens that isn't scoped to a dataset/flow, making this field unpredictably empty.

But at the very least, the dataset/flow name will be in the error message.

@aakash-db aakash-db requested review from sryza and hvanhovell May 20, 2025 20:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants