Skip to content

Commit

Permalink
Fix writing to destination when multiple chunks exist for a document (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bindipankhudi authored Jun 4, 2024
1 parent 2c80a38 commit 3bebfc5
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,27 +333,27 @@ def process_record_message(
if not self.sql_config.cortex_embedding_model:
new_data[EMBEDDING_COLUMN] = embeddings[i]

self.file_writer.process_record_message(
record_msg=AirbyteRecordMessage(
namespace=record_msg.namespace,
stream=record_msg.stream,
data=new_data,
emitted_at=record_msg.emitted_at,
),
stream_schema={
"type": "object",
"properties": {
DOCUMENT_ID_COLUMN: {"type": "string"},
CHUNK_ID_COLUMN: {"type": "string"},
METADATA_COLUMN: {"type": "object"},
DOCUMENT_CONTENT_COLUMN: {"type": "string"},
EMBEDDING_COLUMN: {
"type": "array",
"items": {"type": "float"},
self.file_writer.process_record_message(
record_msg=AirbyteRecordMessage(
namespace=record_msg.namespace,
stream=record_msg.stream,
data=new_data,
emitted_at=record_msg.emitted_at,
),
stream_schema={
"type": "object",
"properties": {
DOCUMENT_ID_COLUMN: {"type": "string"},
CHUNK_ID_COLUMN: {"type": "string"},
METADATA_COLUMN: {"type": "object"},
DOCUMENT_CONTENT_COLUMN: {"type": "string"},
EMBEDDING_COLUMN: {
"type": "array",
"items": {"type": "float"},
},
},
},
},
)
)

def _get_table_by_name(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,100 @@ def test_record_write_fidelity(self):
"DOCUMENT_CONTENT": '"str_col: Dogs are number 0"',
}

def test_write_with_chunk_size_5(self):
self._delete_table("mystream")
self.config["processing"]["chunk_size"] = 5
catalog = self._get_configured_catalog(DestinationSyncMode.overwrite)
first_state_message = self._state({"state": "1"})
first_record = [
self._record(
stream="mystream",
str_value=f"Dogs are number {i}",
int_value=i,
)
for i in range(5)
]

# initial sync with replace
destination = DestinationSnowflakeCortex()
_ = list(
destination.write(
config=self.config,
configured_catalog=catalog,
input_messages=[*first_record, first_state_message],
)
)
assert self._get_record_count("mystream") == 15

# subsequent sync with append
append_catalog = self._get_configured_catalog(DestinationSyncMode.append)
list(
destination.write(
config=self.config,
configured_catalog=append_catalog,
input_messages=[self._record("mystream", "Cats are nice", 6), first_state_message],
)
)
assert self._get_record_count("mystream") == 18

# subsequent sync with append_dedup
append_dedup_catalog = self._get_configured_catalog(DestinationSyncMode.append_dedup)
list(
destination.write(
config=self.config,
configured_catalog=append_dedup_catalog,
input_messages=[
self._record("mystream", "Cats are nice too", 4),
first_state_message,
],
)
)
assert self._get_record_count("mystream") == 18

def test_write_fidelity_with_chunk_size_5(self):
self._delete_table("mystream")
self.config["processing"]["chunk_size"] = 5
catalog = self._get_configured_catalog(DestinationSyncMode.overwrite)
first_state_message = self._state({"state": "1"})
records = [
self._record(
stream="mystream",
str_value=f"Dogs are number {i}",
int_value=i,
)
for i in range(1)
]

# initial sync with replace
destination = DestinationSnowflakeCortex()
list(destination.write(self.config, catalog, [*records, first_state_message]))
assert self._get_record_count("mystream") == 3
first_written_record = self._get_all_records("mystream")[0]
second_written_record = self._get_all_records("mystream")[1]
third_written_record = self._get_all_records("mystream")[2]
assert list(first_written_record.keys()) == [
"DOCUMENT_ID",
"CHUNK_ID",
"METADATA",
"DOCUMENT_CONTENT",
"EMBEDDING",
]
assert first_written_record.pop("EMBEDDING")
assert first_written_record.pop("CHUNK_ID")
metadata = first_written_record.pop("METADATA")
_ = metadata

assert first_written_record == {
"DOCUMENT_ID": "Stream_mystream_Key_0",
"DOCUMENT_CONTENT": '"str_col:"',
}
assert second_written_record["DOCUMENT_ID"] == "Stream_mystream_Key_0"
assert second_written_record["DOCUMENT_CONTENT"] == '"Dogs are"'
assert third_written_record["DOCUMENT_ID"] == "Stream_mystream_Key_0"
assert third_written_record["DOCUMENT_CONTENT"] == '"number 0"'



"""
Following tests are not code specific, but are useful to confirm that the Cortex functions are available and behaving as expcected
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ data:
connectorSubtype: vectorstore
connectorType: destination
definitionId: d9e5418d-f0f4-4d19-a8b1-5630543638e2
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
dockerRepository: airbyte/destination-snowflake-cortex
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake-cortex
githubIssueLabel: destination-snowflake-cortex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "airbyte-destination-snowflake-cortex"
version = "0.2.1"
version = "0.2.2"
description = "Airbyte destination implementation for Snowflake cortex."
authors = ["Airbyte <contact@airbyte.io>"]
license = "MIT"
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/snowflake-cortex.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ To get started, sign up for [Snowflake](https://www.snowflake.com/en/). Ensure y

| Version | Date | Pull Request | Subject |
|:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.1 | 2024-06-03 | [#](https://github.com/airbytehq/airbyte/pull/) | Add handling for unexpected/undefined state codes.
| 0.2.2 | 2024-06-04 | [#39092](https://github.com/airbytehq/airbyte/pull/39092) | Fix writing when multiple chunks exist for a document.
| 0.2.1 | 2024-06-03 | [#38830](https://github.com/airbytehq/airbyte/pull/38830) | Add handling for unexpected/undefined state codes.
| 0.2.0 | 2024-05-30 | [#38337](https://github.com/airbytehq/airbyte/pull/38337) | Fix `merge` behavior when multiple chunks exist for a document. Includes additional refactoring and improvements.
| 0.1.2 | 2024-05-17 | [#38327](https://github.com/airbytehq/airbyte/pull/38327) | Fix chunking related issue.
| 0.1.1 | 2024-05-15 | [#38206](https://github.com/airbytehq/airbyte/pull/38206) | Bug fixes.
Expand Down

0 comments on commit 3bebfc5

Please sign in to comment.