-
Notifications
You must be signed in to change notification settings - Fork 478
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Fabric E2E Sample] Added updates on notebook and pipeline (#959)
* Updated notebooks, pipelines, and standardized folder structure; improved logging and linting. * Enhanced dim table insertion, streamlined tests, and optimized DDO module compatibility. --------- Co-authored-by: Sean Ma <34230764+maye-msft@users.noreply.github.com> Co-authored-by: yuna-s <38641545+yuna-s@users.noreply.github.com> Co-authored-by: Anuj Parashar <promisinganuj@gmail.com> Co-authored-by: yunasugimoto <yunasugimoto@microsoft.com>
- Loading branch information
1 parent
79eb8f0
commit 7fbe407
Showing
19 changed files
with
225,207 additions
and
72 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
95 changes: 95 additions & 0 deletions
95
e2e_samples/fabric_dataops_sample/config/fabric_environment/ddo_transform_standardize.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
"""Main module.""" | ||
|
||
|
||
from typing import Tuple | ||
|
||
from pyspark.sql import DataFrame | ||
from pyspark.sql.functions import col, lit, to_timestamp | ||
from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType, TimestampType # noqa: E501 | ||
|
||
|
||
def get_schema(schema_name: StringType) -> StructType: | ||
if schema_name == "in_parkingbay_schema": | ||
schema = StructType( | ||
[ | ||
StructField( | ||
"the_geom", | ||
StructType( | ||
[ | ||
StructField("coordinates", ArrayType(ArrayType(ArrayType(ArrayType(DoubleType()))))), | ||
StructField("type", StringType()), | ||
] | ||
), | ||
), | ||
StructField("marker_id", StringType()), | ||
StructField("meter_id", StringType()), | ||
StructField("bay_id", StringType(), False), | ||
StructField("last_edit", StringType()), | ||
StructField("rd_seg_id", StringType()), | ||
StructField("rd_seg_dsc", StringType()), | ||
] | ||
) | ||
elif schema_name == "in_sensordata_schema": | ||
schema = StructType( | ||
[ | ||
StructField("bay_id", StringType(), False), | ||
StructField("st_marker_id", StringType()), | ||
StructField("status", StringType()), | ||
StructField( | ||
"location", | ||
StructType( | ||
[StructField("coordinates", ArrayType(DoubleType())), StructField("type", StringType())] | ||
), | ||
), | ||
StructField("lat", StringType()), | ||
StructField("lon", StringType()), | ||
] | ||
) | ||
return schema | ||
|
||
|
||
def standardize_parking_bay( | ||
parkingbay_sdf: DataFrame, load_id: StringType, loaded_on: TimestampType | ||
) -> Tuple[DataFrame, DataFrame]: | ||
t_parkingbay_sdf = ( | ||
parkingbay_sdf.drop_duplicates(["bay_id"]) | ||
.withColumn("last_edit", to_timestamp("last_edit", "yyyyMMddHHmmss")) | ||
.select( | ||
col("bay_id").cast("int").alias("bay_id"), | ||
"last_edit", | ||
"marker_id", | ||
"meter_id", | ||
"rd_seg_dsc", | ||
col("rd_seg_id").cast("int").alias("rd_seg_id"), | ||
"the_geom", | ||
lit(load_id).alias("load_id"), | ||
lit(loaded_on.isoformat()).cast("timestamp").alias("loaded_on"), | ||
) | ||
).cache() | ||
# Data Validation | ||
good_records = t_parkingbay_sdf.filter(col("bay_id").isNotNull()) | ||
bad_records = t_parkingbay_sdf.filter(col("bay_id").isNull()) | ||
return good_records, bad_records | ||
|
||
|
||
def standardize_sensordata( | ||
sensordata_sdf: DataFrame, load_id: StringType, loaded_on: TimestampType | ||
) -> Tuple[DataFrame, DataFrame]: | ||
t_sensordata_sdf = ( | ||
sensordata_sdf.select( | ||
col("bay_id").cast("int").alias("bay_id"), | ||
"st_marker_id", | ||
col("lat").cast("float").alias("lat"), | ||
col("lon").cast("float").alias("lon"), | ||
"location", | ||
"status", | ||
lit(load_id).alias("load_id"), | ||
lit(loaded_on.isoformat()).cast("timestamp").alias("loaded_on"), | ||
) | ||
).cache() | ||
# Data Validation | ||
good_records = t_sensordata_sdf.filter(col("bay_id").isNotNull()) | ||
bad_records = t_sensordata_sdf.filter(col("bay_id").isNull()) | ||
return good_records, bad_records |
Oops, something went wrong.