Skip to content

Commit

Permalink
Upgraded databricks connect to 13.3 and modified code as per new version
Browse files Browse the repository at this point in the history
  • Loading branch information
pthalasta committed Sep 27, 2023
1 parent 7833498 commit 82677cf
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 76 deletions.
22 changes: 12 additions & 10 deletions morpheus/stages/input/databricks_deltalake_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from morpheus.stages.utils.databricks_utils import configure_databricks_connect
from pyspark.sql.window import Window
from databricks.connect import DatabricksSession

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,12 +73,11 @@ def __init__(self,

super().__init__(config)
self.spark_query = spark_query
configure_databricks_connect(databricks_host,
databricks_token,
databricks_cluster_id,
databricks_port,
databricks_org_id)
self.spark = SparkSession.builder.getOrCreate()
self.spark = DatabricksSession.builder.remote(
host = databricks_host,
token = databricks_token,
cluster_id = databricks_cluster_id
).getOrCreate()
self.items_per_page = items_per_page
self.offset = 0

Expand All @@ -96,12 +96,14 @@ def source_generator(self):
try:
spark_df = self.spark.sql(self.spark_query)
spark_df = spark_df.withColumn('_id', sf.monotonically_increasing_id())
w = Window.partitionBy(sf.lit(1)).orderBy("_id")
spark_df = spark_df.select("*").withColumn("_id", sf.row_number().over(w))
count = spark_df.count()
while self.offset <= count:
df = spark_df.where(sf.col('_id').between(self.offset, self.offset + self.items_per_page))
logger.debug(f"Reading next iteration data between index: {str(self.offset)} and {str(self.offset + self.items_per_page + 1)}")
self.offset = self.offset + self.items_per_page + 1
print(f"Reading next iteration data between index: {str(self.offset)} and {str(self.offset + self.items_per_page + 1)}")
self.offset += self.items_per_page + 1
yield MessageMeta(df=cudf.from_pandas(df.toPandas().drop(["_id"],axis=1)))
except Exception as e:
logger.exception("Error occurred reading data from feature store and converting to Dataframe: {}".format(e))
raise
raise
37 changes: 15 additions & 22 deletions morpheus/stages/output/write_to_databricks_deltalake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import pandas as pd
import cudf
import typing
import dask

from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType
Expand All @@ -42,7 +41,7 @@
from morpheus.messages import MessageMeta
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.utils.databricks_utils import configure_databricks_connect
from databricks.connect import DatabricksSession

logger = logging.getLogger(__name__)

Expand All @@ -64,7 +63,9 @@ class DataBricksDeltaLakeSinkStage(SinglePortStage):
Access token for Databricks cluster.
databricks_cluster_id : str, default None
Databricks cluster to be used to query the data as per SQL provided.
databricks_port : str, defailt "15001"
delta_table_write_mode: str, default "append"
Delta table write mode for storing data.
databricks_port : str, default "15001"
Databricks port that Databricks Connect connects to. Defaults to 15001
databricks_org_id : str, default "0"
Azure-only, see ?o=orgId in URL. Defaults to 0 for other platform
Expand All @@ -76,23 +77,17 @@ def __init__(self,
databricks_host: str = None,
databricks_token: str = None,
databricks_cluster_id: str = None,
delta_table_write_mode: str = "append",
databricks_port: str = "15001",
databricks_org_id: str = "0"):

super().__init__(config)
self.delta_path = delta_path
configure_databricks_connect(databricks_host,
databricks_token,
databricks_cluster_id,
databricks_port,
databricks_org_id)

# Enable Arrow-based columnar data transfers
self.spark = SparkSession.builder \
.config("spark.databricks.delta.optimizeWrite.enabled", "true") \
.config("spark.databricks.delta.autoCompact.enabled", "true") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
self.spark = DatabricksSession.builder.remote(
host = databricks_host,
token = databricks_token,
cluster_id = databricks_cluster_id
).getOrCreate()

@property
def name(self) -> str:
Expand All @@ -114,20 +109,18 @@ def supports_cpp_node(self) -> bool:
def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
stream = input_stream[0]

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):


def write_to_deltalake(meta: MessageMeta):
# convert cudf to spark dataframe
def write_to_deltalake(meta: MessageMeta):
# convert cudf to spark dataframe
df = meta.copy_dataframe()
if isinstance(df, cudf.DataFrame):
df = df.to_pandas()
print(df)
schema = self._extract_schema_from_pandas_dataframe(df)
spark_df = self.spark.createDataFrame(df, schema=schema)
spark_df.write \
.format('delta') \
.option("mergeSchema", "true") \
.mode("append") \
.mode(self.delta_table_write_mode) \
.save(self.delta_path)
return meta
node = builder.make_node(self.unique_name, ops.map(write_to_deltalake))
Expand All @@ -137,7 +130,7 @@ def write_to_deltalake(meta: MessageMeta):
return node, input_stream[1]

@staticmethod
def _extract_schema_from_pandas_dataframe(self, df: pd.DataFrame): -> StructType
def _extract_schema_from_pandas_dataframe(df: pd.DataFrame) -> StructType:
"""
Extract approximate schemas from pandas dataframe
"""
Expand Down
44 changes: 0 additions & 44 deletions morpheus/utils/databricks_utils.py

This file was deleted.

0 comments on commit 82677cf

Please sign in to comment.