CommonTransforms is a Python class that uses PySpark libraries to apply common transformations to a Spark dataframe.
Use %run magic command to include this notebook. The %run magic command will work equally well for Synapse Spark Pool and Databricks
%run "/<notebook path in workspace>/CommonTransforms"
Then instantiate the class by passing the dataframe
df = spark.read.csv(path)
ct = CommonTransforms(df)
CommonTransforms supports the following functions:
Removes leading and trailing spaces from all string columns in the dataframe
-
Parameters: None
-
Usage:
df = ct.trim()
Replace null values in dataframe with a default value. The default value is applied to all columns or a subset of columns passed as a list. The default value could be numeric, string, date, timestamp, boolean or dictionary object. When dictionary object is passed, custom default values can be applied to specified columns. The default value is only applied to the columns of same data type. For e.g. if the default value is a string only the string columns which are null are replaced and the numeric columns are untouched.
-
Parameters:
- value - int, long, float, string, bool date, timestamp or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, boolean, or string.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
-
Usage:
df = ct.replaceNull(0)
df = ct.replaceNull("NA")
df = ct.replaceNull("1900-01-01T00:00:00","start_datetime")
df = ct.replaceNull("9999-12-31T23:59:59","end_datetime")
df = ct.ct.replaceNull({"passenger_count":1,"store_and_fwd_flag":"N","tip_amount":0})
Delete duplicate records from dataframe with option to consider a subset of key columns.
-
Parameters:
- subset - optional list of column names to consider.
-
Usage:
df = ct.deDuplicate()
df = ct.deDuplicate(["col1","col2"])
Convert all or a subset of timestamp columns from UTC to timestamp in local timezone
-
Parameters:
- localTimeZone - your target timezone specified as Country/City. Here is the list of timezones.
- subset - optional list of column names to consider.
-
Usage:
df = ct.utc_to_local("Australia/Sydney")
df = ct.utc_to_local("Australia/Sydney",["pickup_datetime","dropoff_datetime"])
Convert all or a subset of timestamp columns from local timezone to UTC
-
Parameters:
- localTimeZone - your source timezone specified as Country/City. Here is the list of timezones.
- subset - optional list of column names to consider.
-
Usage:
df = ct.local_to_utc("Australia/Sydney")
df = ct.utc_to_local("Australia/Sydney",["recorded_datetime"])
Converts all or selected timestamps in dataframe from one timezone to another.
-
Parameters:
- fromTimezone - specified as Country/City. Here is the list of timezones.
- toTimezone - specified as Country/City
-
Usage:
df = ct.changeTimezone("Australia/Sydney","America/New_York")
Drop columns that are either system or non-business from dataframe
-
Parameters:
-
columns - list of columns to be dropped
-
Usage:
df = ct.dropSysColumns(["col1","col2"])
Create a checksum column using all columns of the dataframe
-
Parameters:
-
colName - Name of the new checksum column
-
Usage:
df = ct.addChecksumCol("checksum")
Converts a 5-digit or 7-digit Julian date to a calendar date
-
Parameters:
-
subset - a mandatory list of columns that contain a Julian date value
-
Usage:
df = df.withColumn("sys_date1",lit(20275)) #Date in Julian Format
df = ct.julian_to_calendar("sys_date1")
# Output=2020-10-01
Converts a calendar date to 5-digit Julian date
-
Parameters:
-
subset - a mandatory list of columns that contain a date
-
Usage:
df = df.withColumn("sys_date2",lit("2020-10-01").cast("date")) #Date in Gregorian Format
df = ct.calendar_to_julian("sys_date2")
# Output=20275
Add a set of literal value columns to dataframe passed as dictionary parameter. For e.g adding audit columns to a dataframe
-
Parameters:
-
colDict - a mandatory dict object that contains key(column) and values.
-
Usage:
audit={"audit_key":66363,
"pipeline_id":"56f633",
"start_datetime": "2020-10-01T10:00:00",
"end_datetime": "2020-10-01T10:02:05"}
df = ct.addLitCols(audit)