Skip to content

Commit

Permalink
[SPARK-50715][PYTHON][CONNECT] SparkSession.Builder sets the config…
Browse files Browse the repository at this point in the history
…s in batch

### What changes were proposed in this pull request?
`SparkSession.Builder` sets the configs in batch

### Why are the changes needed?
I notice that there are practice workflows with 500+ configs, existing implementation always set the configs one by one, and thus cause 500+ `Config` RPCs.

### Does this PR introduce _any_ user-facing change?
No, internal change

### How was this patch tested?
Existing CI

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49346 from zhengruifeng/py_connect_set_all.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
  • Loading branch information
zhengruifeng committed Jan 3, 2025
1 parent fa0c995 commit a3a8d1c
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 108 deletions.
14 changes: 14 additions & 0 deletions python/pyspark/sql/connect/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ def set(self, key: str, value: Union[str, int, bool]) -> None:

set.__doc__ = PySparkRuntimeConfig.set.__doc__

def _set_all(self, configs: Dict[str, Union[str, int, bool]], silent: bool) -> None:
conf_list = []
for key, value in configs.items():
if isinstance(value, bool):
value = "true" if value else "false"
elif isinstance(value, int):
value = str(value)
conf_list.append(proto.KeyValue(key=key, value=value))
op_set = proto.ConfigRequest.Set(pairs=conf_list, silent=silent)
operation = proto.ConfigRequest.Operation(set=op_set)
result = self._client.config(operation)
for warn in result.warnings:
warnings.warn(warn)

def get(
self, key: str, default: Union[Optional[str], _NoValueType] = _NoValue
) -> Optional[str]:
Expand Down
162 changes: 81 additions & 81 deletions python/pyspark/sql/connect/proto/base_pb2.py

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion python/pyspark/sql/connect/proto/base_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1921,17 +1921,32 @@ class ConfigRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

PAIRS_FIELD_NUMBER: builtins.int
SILENT_FIELD_NUMBER: builtins.int
@property
def pairs(
self,
) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___KeyValue]:
"""(Required) The config key-value pairs to set."""
silent: builtins.bool
"""(Optional) Whether to ignore failures."""
def __init__(
self,
*,
pairs: collections.abc.Iterable[global___KeyValue] | None = ...,
silent: builtins.bool | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["pairs", b"pairs"]) -> None: ...
def HasField(
self, field_name: typing_extensions.Literal["_silent", b"_silent", "silent", b"silent"]
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_silent", b"_silent", "pairs", b"pairs", "silent", b"silent"
],
) -> None: ...
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_silent", b"_silent"]
) -> typing_extensions.Literal["silent"] | None: ...

class Get(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand Down
40 changes: 16 additions & 24 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,34 +200,26 @@ def _apply_options(self, session: "SparkSession") -> None:
for i in range(int(os.environ.get("PYSPARK_REMOTE_INIT_CONF_LEN", "0"))):
init_opts = json.loads(os.environ[f"PYSPARK_REMOTE_INIT_CONF_{i}"])

# The options are applied after session creation,
# so options ["spark.remote", "spark.master"] always take no effect.
invalid_opts = ["spark.remote", "spark.master"]

with self._lock:
opts = {}

# Only attempts to set Spark SQL configurations.
# If the configurations are static, it might throw an exception so
# simply ignore it for now.
for k, v in init_opts.items():
# the options are applied after session creation,
# so following options always take no effect
if k not in [
"spark.remote",
"spark.master",
] and k.startswith("spark.sql."):
# Only attempts to set Spark SQL configurations.
# If the configurations are static, it might throw an exception so
# simply ignore it for now.
try:
session.conf.set(k, v)
except Exception as e:
logger.warn(f"Failed to set configuration {k} due to {e}")
if k not in invalid_opts and k.startswith("spark.sql."):
opts[k] = v

with self._lock:
for k, v in self._options.items():
# the options are applied after session creation,
# so following options always take no effect
if k not in [
"spark.remote",
"spark.master",
]:
try:
session.conf.set(k, v)
except Exception as e:
logger.warn(f"Failed to set configuration {k} due to {e}")
if k not in invalid_opts:
opts[k] = v

if len(opts) > 0:
session.conf._set_all(configs=opts, silent=True)

def create(self) -> "SparkSession":
has_channel_builder = self._channel_builder is not None
Expand Down
3 changes: 3 additions & 0 deletions sql/connect/common/src/main/protobuf/spark/connect/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ message ConfigRequest {
message Set {
// (Required) The config key-value pairs to set.
repeated KeyValue pairs = 1;

// (Optional) Whether to ignore failures.
optional bool silent = 2;
}

message Get {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,21 @@ class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigRes
private def handleSet(
operation: proto.ConfigRequest.Set,
conf: RuntimeConfig): proto.ConfigResponse.Builder = {
val silent = operation.hasSilent && operation.getSilent
val builder = proto.ConfigResponse.newBuilder()
operation.getPairsList.asScala.iterator.foreach { pair =>
val (key, value) = SparkConnectConfigHandler.toKeyValue(pair)
conf.set(key, value.orNull)
getWarning(key).foreach(builder.addWarnings)
try {
conf.set(key, value.orNull)
getWarning(key).foreach(builder.addWarnings)
} catch {
case e: Throwable =>
if (silent) {
builder.addWarnings(s"Failed to set $key to $value due to ${e.getMessage}")
} else {
throw e
}
}
}
builder
}
Expand Down

0 comments on commit a3a8d1c

Please sign in to comment.