Skip to content

Commit

Permalink
[SPARK-36642][SQL] Add df.withMetadata pyspark API
Browse files Browse the repository at this point in the history
This PR adds the pyspark API `df.withMetadata(columnName, metadata)`. The scala API is added in this PR apache#33853.

### What changes were proposed in this pull request?

To make it easy to use/modify the semantic annotation, we want to have a shorter API to update the metadata in a dataframe. Currently we have `df.withColumn("col1", col("col1").alias("col1", metadata=metadata))` to update the metadata without changing the column name, and this is too verbose. We want to have a syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve the same functionality.

### Why are the changes needed?

A bit of background for the frequency of the update: We are working on inferring the semantic data types and use them in AutoML and store the semantic annotation in the metadata. So in many cases, we will suggest the user update the metadata to correct the wrong inference or add the annotation for weak inference.

### Does this PR introduce _any_ user-facing change?

Yes.
A syntax suger API `df.withMetadata("col1", metadata=metadata)` to achieve the same functionality as`df.withColumn("col1", col("col1").alias("col1", metadata=metadata))`.

### How was this patch tested?

doctest.

Closes apache#34021 from liangz1/withMetadataPython.

Authored-by: Liang Zhang <liang.zhang@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
  • Loading branch information
liangz1 authored and WeichenXu123 committed Sep 24, 2021
1 parent 0b65daa commit ef7441b
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import json
import sys
import random
import warnings
Expand All @@ -23,6 +24,7 @@
from html import escape as html_escape

from pyspark import copy_func, since, _NoValue
from pyspark.context import SparkContext
from pyspark.rdd import RDD, _load_from_socket, _local_iterator_from_socket
from pyspark.serializers import BatchedSerializer, PickleSerializer, \
UTF8Deserializer
Expand Down Expand Up @@ -2536,6 +2538,31 @@ def withColumnRenamed(self, existing, new):
"""
return DataFrame(self._jdf.withColumnRenamed(existing, new), self.sql_ctx)

def withMetadata(self, columnName, metadata):
"""Returns a new :class:`DataFrame` by updating an existing column with metadata.
.. versionadded:: 3.3.0
Parameters
----------
columnName : str
string, name of the existing column to update the metadata.
metadata : dict
dict, new metadata to be assigned to df.schema[columnName].metadata
Examples
--------
>>> df_meta = df.withMetadata('age', {'foo': 'bar'})
>>> df_meta.schema['age'].metadata
{'foo': 'bar'}
"""
if not isinstance(metadata, dict):
raise TypeError("metadata should be a dict")
sc = SparkContext._active_spark_context
jmeta = sc._jvm.org.apache.spark.sql.types.Metadata.fromJson(
json.dumps(metadata))
return DataFrame(self._jdf.withMetadata(columnName, jmeta), self.sql_ctx)

def drop(self, *cols):
"""Returns a new :class:`DataFrame` that drops the specified column.
This is a no-op if schema doesn't contain the given column name(s).
Expand Down

0 comments on commit ef7441b

Please sign in to comment.