Skip to content

Commit b210f42

Browse files
xupefeiHyukjinKwon
andcommitted
[SPARK-49087][SQL][CONNECT] Distinguish UnresolvedFunction calling internal functions
### What changes were proposed in this pull request? This PR introduces a new Protobuf field `is_internal` to `UnresolvedFunction` message. This field is used to carry the info on whether an `UnresolvedFunction` is calling an internal function (using the `Column#internalFn` API), so that the Connect server could act appropriately: - If `true`, then (inside Catalyst) mark the function as internal. - If `false`, then mark the function as external (public or user-defined). - (Current behaviour) If not set, then look it up in the internal registry: mark as "internal" if found, "external" otherwise. TODO: [SPARK-50658](https://issues.apache.org/jira/browse/SPARK-50658): apply the same change to the Spark Connect Python client. ### Why are the changes needed? Tidy up the code and make our first-party Connect client adhere to the best practices. ### Does this PR introduce _any_ user-facing change? Nope. ### How was this patch tested? New test is added. ### Was this patch authored or co-authored using generative AI tooling? Copilot helped generate protobuf comments and repeated test codes. Closes apache#49274 from xupefei/internal-fn. Lead-authored-by: Paddy Xu <xupaddy@gmail.com> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 2acc969 commit b210f42

File tree

1,175 files changed

+1522
-756
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,175 files changed

+1522
-756
lines changed

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/columnNodeSupport.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,19 @@ object ColumnNodeToProtoConverter extends (ColumnNode => proto.Expression) {
7373
.setColName(regex)
7474
planId.foreach(b.setPlanId)
7575

76-
case UnresolvedFunction(functionName, arguments, isDistinct, isUserDefinedFunction, _, _) =>
77-
// TODO(SPARK-49087) use internal namespace.
76+
case UnresolvedFunction(
77+
functionName,
78+
arguments,
79+
isDistinct,
80+
isUserDefinedFunction,
81+
isInternal,
82+
_) =>
7883
builder.getUnresolvedFunctionBuilder
7984
.setFunctionName(functionName)
8085
.setIsUserDefinedFunction(isUserDefinedFunction)
8186
.setIsDistinct(isDistinct)
8287
.addAllArguments(arguments.map(apply(_, e)).asJava)
88+
.setIsInternal(isInternal)
8389

8490
case Alias(child, name, metadata, _) =>
8591
val b = builder.getAliasBuilder.setExpr(apply(child, e))
@@ -156,6 +162,7 @@ object ColumnNodeToProtoConverter extends (ColumnNode => proto.Expression) {
156162
case CaseWhenOtherwise(branches, otherwise, _) =>
157163
val b = builder.getUnresolvedFunctionBuilder
158164
.setFunctionName("when")
165+
.setIsInternal(false)
159166
branches.foreach { case (condition, value) =>
160167
b.addArguments(apply(condition, e))
161168
b.addArguments(apply(value, e))

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToProtoConverterSuite.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,22 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
128128
.setFunctionName("+")
129129
.setIsDistinct(false)
130130
.addArguments(attribute("a"))
131-
.addArguments(expr(_.getLiteralBuilder.setInteger(1)))))
131+
.addArguments(expr(_.getLiteralBuilder.setInteger(1)))
132+
.setIsInternal(false)))
132133
testConversion(
133134
UnresolvedFunction(
134135
"db1.myAgg",
135136
Seq(UnresolvedAttribute("a")),
136137
isDistinct = true,
137-
isUserDefinedFunction = true),
138+
isUserDefinedFunction = true,
139+
isInternal = true),
138140
expr(
139141
_.getUnresolvedFunctionBuilder
140142
.setFunctionName("db1.myAgg")
141143
.setIsDistinct(true)
142144
.setIsUserDefinedFunction(true)
143-
.addArguments(attribute("a"))))
145+
.addArguments(attribute("a"))
146+
.setIsInternal(true)))
144147
}
145148

146149
test("alias") {
@@ -247,10 +250,12 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
247250
expr(
248251
_.getWindowBuilder
249252
.setWindowFunction(
250-
expr(_.getUnresolvedFunctionBuilder
251-
.setFunctionName("sum")
252-
.setIsDistinct(false)
253-
.addArguments(attribute("a"))))
253+
expr(
254+
_.getUnresolvedFunctionBuilder
255+
.setFunctionName("sum")
256+
.setIsDistinct(false)
257+
.addArguments(attribute("a"))
258+
.setIsInternal(false)))
254259
.addPartitionSpec(attribute("b"))
255260
.addPartitionSpec(attribute("c"))
256261
.addOrderSpec(proto.Expression.SortOrder
@@ -276,7 +281,8 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
276281
_.getUnresolvedFunctionBuilder
277282
.setFunctionName("sum")
278283
.setIsDistinct(false)
279-
.addArguments(attribute("a"))))
284+
.addArguments(attribute("a"))
285+
.setIsInternal(false)))
280286
.addPartitionSpec(attribute("b"))
281287
.addPartitionSpec(attribute("c"))))
282288
testWindowFrame(
@@ -310,7 +316,8 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
310316
_.getUnresolvedFunctionBuilder
311317
.setFunctionName("+")
312318
.addArguments(expr(_.setUnresolvedNamedLambdaVariable(catX)))
313-
.addArguments(attribute("y"))))
319+
.addArguments(attribute("y"))
320+
.setIsInternal(false)))
314321
.addArguments(catX)))
315322
}
316323

@@ -330,7 +337,8 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
330337
.setFunctionName("when")
331338
.addArguments(attribute("c1"))
332339
.addArguments(expr(_.getLiteralBuilder.setString("r1")))
333-
.addArguments(expr(_.getLiteralBuilder.setString("fallback")))))
340+
.addArguments(expr(_.getLiteralBuilder.setString("fallback")))
341+
.setIsInternal(false)))
334342
}
335343

336344
test("extract field") {

python/pyspark/sql/connect/proto/expressions_pb2.py

Lines changed: 47 additions & 47 deletions
Large diffs are not rendered by default.

python/pyspark/sql/connect/proto/expressions_pb2.pyi

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,7 @@ class Expression(google.protobuf.message.Message):
847847
ARGUMENTS_FIELD_NUMBER: builtins.int
848848
IS_DISTINCT_FIELD_NUMBER: builtins.int
849849
IS_USER_DEFINED_FUNCTION_FIELD_NUMBER: builtins.int
850+
IS_INTERNAL_FIELD_NUMBER: builtins.int
850851
function_name: builtins.str
851852
"""(Required) name (or unparsed name for user defined function) for the unresolved function."""
852853
@property
@@ -864,27 +865,46 @@ class Expression(google.protobuf.message.Message):
864865
When it is not a user defined function, Connect will use the function name directly.
865866
When it is a user defined function, Connect will parse the function name first.
866867
"""
868+
is_internal: builtins.bool
869+
"""(Optional) Indicate if this function is defined in the internal function registry.
870+
If not set, the server will try to look up the function in the internal function registry
871+
and decide appropriately.
872+
"""
867873
def __init__(
868874
self,
869875
*,
870876
function_name: builtins.str = ...,
871877
arguments: collections.abc.Iterable[global___Expression] | None = ...,
872878
is_distinct: builtins.bool = ...,
873879
is_user_defined_function: builtins.bool = ...,
880+
is_internal: builtins.bool | None = ...,
874881
) -> None: ...
882+
def HasField(
883+
self,
884+
field_name: typing_extensions.Literal[
885+
"_is_internal", b"_is_internal", "is_internal", b"is_internal"
886+
],
887+
) -> builtins.bool: ...
875888
def ClearField(
876889
self,
877890
field_name: typing_extensions.Literal[
891+
"_is_internal",
892+
b"_is_internal",
878893
"arguments",
879894
b"arguments",
880895
"function_name",
881896
b"function_name",
882897
"is_distinct",
883898
b"is_distinct",
899+
"is_internal",
900+
b"is_internal",
884901
"is_user_defined_function",
885902
b"is_user_defined_function",
886903
],
887904
) -> None: ...
905+
def WhichOneof(
906+
self, oneof_group: typing_extensions.Literal["_is_internal", b"_is_internal"]
907+
) -> typing_extensions.Literal["is_internal"] | None: ...
888908

889909
class ExpressionString(google.protobuf.message.Message):
890910
"""Expression as string."""

sql/connect/common/src/main/protobuf/spark/connect/expressions.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,11 @@ message Expression {
261261
// When it is not a user defined function, Connect will use the function name directly.
262262
// When it is a user defined function, Connect will parse the function name first.
263263
bool is_user_defined_function = 4;
264+
265+
// (Optional) Indicate if this function is defined in the internal function registry.
266+
// If not set, the server will try to look up the function in the internal function registry
267+
// and decide appropriately.
268+
optional bool is_internal = 5;
264269
}
265270

266271
// Expression as string.

sql/connect/common/src/test/resources/query-tests/queries/column_add.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
"unresolvedAttribute": {
2323
"unparsedIdentifier": "b"
2424
}
25-
}]
25+
}],
26+
"isInternal": false
2627
}
2728
}]
2829
}
Binary file not shown.

sql/connect/common/src/test/resources/query-tests/queries/column_and.json

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
"literal": {
2626
"integer": 10
2727
}
28-
}]
28+
}],
29+
"isInternal": false
2930
}
3031
}, {
3132
"unresolvedFunction": {
@@ -38,9 +39,11 @@
3839
"literal": {
3940
"double": 0.5
4041
}
41-
}]
42+
}],
43+
"isInternal": false
4244
}
43-
}]
45+
}],
46+
"isInternal": false
4447
}
4548
}]
4649
}
Binary file not shown.

sql/connect/common/src/test/resources/query-tests/queries/column_between.json

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
"literal": {
2626
"integer": 10
2727
}
28-
}]
28+
}],
29+
"isInternal": false
2930
}
3031
}, {
3132
"unresolvedFunction": {
@@ -38,9 +39,11 @@
3839
"literal": {
3940
"integer": 20
4041
}
41-
}]
42+
}],
43+
"isInternal": false
4244
}
43-
}]
45+
}],
46+
"isInternal": false
4447
}
4548
}]
4649
}

0 commit comments

Comments
 (0)