Skip to content

Commit

Permalink
Bug fixes in sink test code
Browse files Browse the repository at this point in the history
  • Loading branch information
pthalasta committed Oct 3, 2023
1 parent 9e70e79 commit cf7029e
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions tests/test_write_to_databricks_deltalake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import cudf
import pytest
from unittest.mock import patch
from unittest import mock
from morpheus.stages.output.write_to_databricks_deltalake_stage import DataBricksDeltaLakeSinkStage
from _utils.dataset_manager import DatasetManager
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.config import Config
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
Expand All @@ -32,12 +34,13 @@ def test_databricks_deltalake_source_stage_pipe(config: Config, dataset: Dataset
"""

df_input_a = dataset['filter_probs.csv']
with patch('utils.write_to_databricks_deltalake_stage.DatabricksSession',) as mock_db_session:
with patch('utils.write_to_databricks_deltalake_stage.DatabricksSession') as mock_db_session:
databricks_deltalake_sink_stage = DataBricksDeltaLakeSinkStage(config,
delta_path="", delta_table_write_mode="append",
databricks_host="", databricks_token="",
databricks_cluster_id="")

mock_spark_df = mock.Mock()
databricks_deltalake_sink_stage.spark.createDataFrame.return_value = mock_spark_df
# df_input_a = cudf.DataFrame({"name": ["five", "four", "three", "two", "one"], "value": [5, 4, 3, 2, 1]})
pipeline = LinearPipeline(config)
pipeline.set_source(InMemorySourceStage(config, [df_input_a]))
Expand All @@ -46,3 +49,4 @@ def test_databricks_deltalake_source_stage_pipe(config: Config, dataset: Dataset
sink = pipeline.add_stage(databricks_deltalake_sink_stage)
pipeline.run()
databricks_deltalake_sink_stage.spark.createDataFrame.assert_called_once()
mock_spark_df.write.format.assert_called_once()

0 comments on commit cf7029e

Please sign in to comment.