Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support to read and write to Databricks delta tables #630

Merged
merged 35 commits into from
Oct 5, 2023
Merged
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
56108cc
Added databricks dependency
pthalasta Jan 17, 2023
c3598e7
Added jdk dependency installation
pthalasta Jan 17, 2023
9f989d7
Added delta lake source stage
pthalasta Jan 24, 2023
dbafd69
Added output stage to write to delta lake table
pthalasta Jan 24, 2023
323a077
Merge branch 'nv-morpheus:branch-23.01' into branch-23.01
pthalasta Jan 24, 2023
04d75d9
Merge branch 'branch-23.01' into branch-23.01
pthalasta Jan 25, 2023
e5b7930
Updated source reader with formatting changes
pthalasta Jan 26, 2023
54930f2
Updated sink writer with formatting changes
pthalasta Jan 26, 2023
ac47465
Merge branch 'branch-23.01' into branch-23.01
pthalasta Jan 26, 2023
0fad59f
updated source stage with pagination and moved config creation to uti…
pthalasta Sep 1, 2023
3aebcbb
Renamed classes of source and sink to match databrick-deltalake naming
pthalasta Sep 1, 2023
c0a2670
Changed Dockerfile and requirements as per latest changes in 23.11 br…
pthalasta Sep 1, 2023
03c9758
Updated branch changes to reflect 23.11
pthalasta Sep 1, 2023
cdc5403
Removed databricks-connect from runtime and example yaml requirements…
pthalasta Sep 5, 2023
cc3bdc7
Removed databricks-connect from runtime and example yaml requirements…
pthalasta Sep 5, 2023
7833498
Removed databricks-connect from runtime and example yaml requirements…
pthalasta Sep 5, 2023
82677cf
Upgraded databricks connect to 13.3 and modified code as per new version
pthalasta Sep 27, 2023
bee1a55
Upgraded databricks connect to 13.3 and modified code as per new version
pthalasta Sep 27, 2023
88f958a
Upgraded databricks connect to 13.3 and modified code as per new version
pthalasta Sep 28, 2023
86183e9
Added test case for databricks delta lake source stage
pthalasta Oct 2, 2023
39d7f60
Added test case for databricks delta lake source stage
pthalasta Oct 2, 2023
f6018bf
Added test case for databricks delta lake source stage
pthalasta Oct 2, 2023
25aec2f
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
pthalasta Oct 2, 2023
d30e924
Merge branch 'branch-23.11' into branch-23.01
pthalasta Oct 2, 2023
377456b
Added copyright info for test file
pthalasta Oct 2, 2023
cdb75c6
Merge branch 'branch-23.01' of https://github.com/pthalasta/Morpheus …
pthalasta Oct 2, 2023
ad6c6f6
Bug fixes in test stage
pthalasta Oct 2, 2023
58889ec
Bug fixes in test stage
pthalasta Oct 2, 2023
9e70e79
Added sink test case and additional cleanup of code
pthalasta Oct 3, 2023
cf7029e
Bug fixes in sink test code
pthalasta Oct 3, 2023
279c695
formatting changes
pthalasta Oct 3, 2023
40a2ca5
ci suggested fixes to test/code
Oct 3, 2023
4aa07b3
ci suggested fixes to test/code
Oct 4, 2023
e738677
doc generation fixes
Oct 4, 2023
acb796c
added PreallocatorMixin imports in source
Oct 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated sink writer with formatting changes
  • Loading branch information
pthalasta committed Jan 26, 2023
commit 54930f2fb26cdc01ddbcd145f86a6c9c02490274
79 changes: 42 additions & 37 deletions morpheus/stages/output/write_to_deltalake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,26 @@

import logging
import os
from mrc.core import operators as ops
import typing

import mrc
import cudf
import typing
from mrc.core import operators as ops
from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import LongType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import TimestampType

from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages import MessageMeta
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.messages import MessageMeta
from morpheus.io import serializers


from pyspark.sql import SparkSession
from pyspark.sql.types import (
BooleanType,
DoubleType,
FloatType,
IntegerType,
LongType,
StringType,
StructField,
StructType,
TimestampType,
)


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,15 +72,18 @@ def __init__(self,

super().__init__(config)
self.delta_path = delta_path
self._configure_databricks_connect(databricks_host, databricks_token, databricks_cluster_id, databricks_port,databricks_org_id)
self._configure_databricks_connect(databricks_host,
databricks_token,
databricks_cluster_id,
databricks_port,
databricks_org_id)
self.spark = SparkSession.builder.config("spark.databricks.delta.optimizeWrite.enabled", "true")\
.config("spark.databricks.delta.autoCompact.enabled", "true")\
.getOrCreate()
.config("spark.databricks.delta.autoCompact.enabled", "true")\
.getOrCreate()

# Enable Arrow-based columnar data transfers
self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


@property
def name(self) -> str:
return "to-delta"
pthalasta marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -106,16 +103,19 @@ def supports_cpp_node(self) -> bool:

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
stream = input_stream[0]

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):

def write_to_deltalake(x: MessageMeta):
# convert cudf to spark dataframe
df = x.df.to_pandas()
schema = self._extract_schema_from_pandas_dataframe(df)
spark_df = self.spark.createDataFrame(df,schema=schema)
spark_df = self.spark.createDataFrame(df, schema=schema)
spark_df.write.format('delta')\
.option("mergeSchema", "true").mode("append")\
.save(self.delta_path)
.option("mergeSchema", "true").mode("append")\
.save(self.delta_path)
return x

obs.pipe(ops.map(write_to_deltalake)).subscribe(sub)

to_delta = builder.make_node_full(self.unique_name, node_fn)
Expand Down Expand Up @@ -152,28 +152,32 @@ def _extract_schema_from_pandas_dataframe(self, df):
spark_schema.append(spark_dtype)
return StructType(spark_schema)


def _configure_databricks_connect(self,databricks_host, databricks_token, databricks_cluster_id, databricks_port, databricks_org_id):
if(os.environ.get('DATABRICKS_HOST',None)==None and databricks_host==None):
def _configure_databricks_connect(self,
databricks_host,
databricks_token,
databricks_cluster_id,
databricks_port,
databricks_org_id):
if (os.environ.get('DATABRICKS_HOST', None) is None and databricks_host is None):
raise Exception("Parameter for databricks host not provided")
if(os.environ.get('DATABRICKS_TOKEN',None)==None and databricks_token==None):
if (os.environ.get('DATABRICKS_TOKEN', None) is None and databricks_token is None):
raise Exception("Parameter for databricks token not provided")
if(os.environ.get('DATABRICKS_CLUSTER_ID',None)==None and databricks_cluster_id==None):
if (os.environ.get('DATABRICKS_CLUSTER_ID', None) is None and databricks_cluster_id is None):
raise Exception("Parameter for databricks cluster not provided")
host = None
cluster = None
token = None
config_file = "/root/.databricks-connect"
should_add = False
if(os.environ.get('DATABRICKS_HOST',None)!=None):
if (os.environ.get('DATABRICKS_HOST', None) is not None):
host = os.environ.get('DATABRICKS_HOST')
else:
host = databricks_host
if(os.environ.get('DATABRICKS_TOKEN',None)!=None):
if (os.environ.get('DATABRICKS_TOKEN', None) is not None):
token = os.environ.get('DATABRICKS_TOKEN')
else:
token = databricks_token
if(os.environ.get('DATABRICKS_CLUSTER_ID',None)!=None):
if (os.environ.get('DATABRICKS_CLUSTER_ID', None) is not None):
cluster = os.environ.get('DATABRICKS_CLUSTER_ID')
else:
cluster = databricks_cluster_id
Expand All @@ -184,7 +188,8 @@ def _configure_databricks_connect(self,databricks_host, databricks_token, databr
"org_id": "@org_id",
"port": "@port"
}"""
config = config.replace("@host",host).replace("@token",token).replace("@cluster_id",cluster).replace("@org_id",databricks_org_id).replace("@port",databricks_port)
config = config.replace("@host", host).replace("@token", token).replace("@cluster_id", cluster).replace(
"@org_id", databricks_org_id).replace("@port", databricks_port)

# check if the config file for databricks connect already exists
config_exist = os.path.exists(config_file)
Expand All @@ -199,6 +204,6 @@ def _configure_databricks_connect(self,databricks_host, databricks_token, databr
else:
should_add = True
if should_add:
with open(config_file,"w+") as f:
with open(config_file, "w+") as f:
f.write(config)
logger.info("Databricks-connect successfully configured!")