Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/builder-contribute/source-shortcut
Browse files Browse the repository at this point in the history
  • Loading branch information
btkcodedev authored Sep 10, 2024
2 parents 06479a1 + 254f34a commit 19e8a13
Show file tree
Hide file tree
Showing 46 changed files with 746 additions and 357 deletions.
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 5.4.0
add migration of global stream_state to per_partition format

## 5.3.0
Connector builder: add flag to disable cache

## 5.2.1
Fix error in incremental sync docs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> Manifest
limit_pages_fetched_per_slice=limits.max_pages_per_slice,
limit_slices_fetched=limits.max_slices,
disable_retries=True,
disable_cache=True,
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
from collections import OrderedDict
from typing import Any, Callable, Iterable, Mapping, Optional, Union

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import PerPartitionKeySerializer
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState
from airbyte_cdk.utils import AirbyteTracedException


class CursorFactory:
Expand Down Expand Up @@ -48,6 +46,7 @@ class PerPartitionCursor(DeclarativeCursor):
_NO_CURSOR_STATE: Mapping[str, Any] = {}
_KEY = 0
_VALUE = 1
_state_to_migrate_from: Mapping[str, Any] = {}

def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter):
self._cursor_factory = cursor_factory
Expand All @@ -65,7 +64,8 @@ def stream_slices(self) -> Iterable[StreamSlice]:

cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
cursor = self._create_cursor(self._NO_CURSOR_STATE)
partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
cursor = self._create_cursor(partition_state)
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor

for cursor_slice in cursor.stream_slices():
Expand Down Expand Up @@ -113,15 +113,13 @@ def set_initial_state(self, stream_state: StreamState) -> None:
return

if "states" not in stream_state:
raise AirbyteTracedException(
internal_message=f"Could not sync parse the following state: {stream_state}",
message="The state for is format invalid. Validate that the migration steps included a reset and that it was performed "
"properly. Otherwise, please contact Airbyte support.",
failure_type=FailureType.config_error,
)
# We assume that `stream_state` is in a global format that can be applied to all partitions.
# Example: {"global_state_format_key": "global_state_format_value"}
self._state_to_migrate_from = stream_state

for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])
else:
for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,15 @@ def __init__(
limit_slices_fetched: Optional[int] = None,
emit_connector_builder_messages: bool = False,
disable_retries: bool = False,
disable_cache: bool = False,
message_repository: Optional[MessageRepository] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
self._limit_slices_fetched = limit_slices_fetched
self._emit_connector_builder_messages = emit_connector_builder_messages
self._disable_retries = disable_retries
self._disable_cache = disable_cache
self._message_repository = message_repository or InMemoryMessageRepository( # type: ignore
self._evaluate_log_level(emit_connector_builder_messages)
)
Expand Down Expand Up @@ -825,6 +827,8 @@ def create_http_requester(self, model: HttpRequesterModel, decoder: Decoder, con
assert model.use_cache is not None # for mypy
assert model.http_method is not None # for mypy

use_cache = model.use_cache and not self._disable_cache

return HttpRequester(
name=name,
url_base=model.url_base,
Expand All @@ -837,7 +841,7 @@ def create_http_requester(self, model: HttpRequesterModel, decoder: Decoder, con
disable_retries=self._disable_retries,
parameters=model.parameters or {},
message_repository=self._message_repository,
use_cache=model.use_cache,
use_cache=use_cache,
decoder=decoder,
stream_response=decoder.is_stream_response() if decoder else False,
)
Expand Down Expand Up @@ -1199,6 +1203,7 @@ def _create_message_repository_substream_wrapper(self, model: ParentStreamConfig
limit_slices_fetched=self._limit_slices_fetched,
emit_connector_builder_messages=self._emit_connector_builder_messages,
disable_retries=self._disable_retries,
disable_cache=self._disable_cache,
message_repository=LogAppenderMessageRepositoryDecorator(
{"airbyte_cdk": {"stream": {"is_substream": True}}, "http": {"is_auxiliary": True}},
self._message_repository,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "airbyte-cdk"
version = "5.2.1"
version = "5.4.0"
description = "A framework for writing Airbyte Connectors."
authors = ["Airbyte <contact@airbyte.io>"]
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ def test_create_source():
assert isinstance(source, ManifestDeclarativeSource)
assert source._constructor._limit_pages_fetched_per_slice == limits.max_pages_per_slice
assert source._constructor._limit_slices_fetched == limits.max_slices
assert source._constructor._disable_cache


def request_log_message(request: dict) -> AirbyteMessage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
from unittest.mock import Mock

import pytest
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import PerPartitionCursor, PerPartitionKeySerializer, StreamSlice
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Record
from airbyte_cdk.utils import AirbyteTracedException

PARTITION = {
"partition_key string": "partition value",
Expand Down Expand Up @@ -519,10 +517,37 @@ def test_get_stream_state_includes_parent_state(mocked_cursor_factory, mocked_pa
assert stream_state == expected_state


def test_given_invalid_state_when_set_initial_state_then_raise_config_error(mocked_cursor_factory, mocked_partition_router) -> None:
cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)

with pytest.raises(AirbyteTracedException) as exception:
cursor.set_initial_state({"invalid_state": 1})
def test_per_partition_state_when_set_initial_global_state(mocked_cursor_factory, mocked_partition_router) -> None:
first_partition = {"first_partition_key": "first_partition_value"}
second_partition = {"second_partition_key": "second_partition_value"}
global_state = {"global_state_format_key": "global_state_format_value"}

assert exception.value.failure_type == FailureType.config_error
mocked_partition_router.stream_slices.return_value = [
StreamSlice(partition=first_partition, cursor_slice={}),
StreamSlice(partition=second_partition, cursor_slice={}),
]
mocked_cursor_factory.create.side_effect = [
MockedCursorBuilder().with_stream_state(global_state).build(),
MockedCursorBuilder().with_stream_state(global_state).build(),
]
cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)
global_state = {"global_state_format_key": "global_state_format_value"}
cursor.set_initial_state(global_state)
assert cursor._state_to_migrate_from == global_state
list(cursor.stream_slices())
assert cursor._cursor_per_partition['{"first_partition_key":"first_partition_value"}'].set_initial_state.call_count == 1
assert cursor._cursor_per_partition['{"first_partition_key":"first_partition_value"}'].set_initial_state.call_args[0] == (
{"global_state_format_key": "global_state_format_value"},
)
assert cursor._cursor_per_partition['{"second_partition_key":"second_partition_value"}'].set_initial_state.call_count == 1
assert cursor._cursor_per_partition['{"second_partition_key":"second_partition_value"}'].set_initial_state.call_args[0] == (
{"global_state_format_key": "global_state_format_value"},
)
expected_state = [
{"cursor": {"global_state_format_key": "global_state_format_value"}, "partition": {"first_partition_key": "first_partition_value"}},
{
"cursor": {"global_state_format_key": "global_state_format_value"},
"partition": {"second_partition_key": "second_partition_value"},
},
]
assert cursor.get_stream_state()["states"] == expected_state
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def output_dataframe(result_df: pd.DataFrame) -> Output[pd.DataFrame]:
Returns a Dagster Output object with a dataframe as the result and a markdown preview.
"""

# Truncate to 100 rows to avoid dagster throwing a "too large" error
MAX_PREVIEW_ROWS = 100
# Truncate to 10 rows to avoid dagster throwing a "too large" error
MAX_PREVIEW_ROWS = 10
is_truncated = len(result_df) > MAX_PREVIEW_ROWS
preview_result_df = result_df.head(MAX_PREVIEW_ROWS)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "orchestrator"
version = "0.5.3"
version = "0.5.4"
description = ""
authors = ["Ben Church <ben@airbyte.io>"]
readme = "README.md"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 072d5540-f236-4294-ba7c-ade8fd918496
dockerImageTag: 3.2.3
dockerImageTag: 3.2.4
dockerRepository: airbyte/destination-databricks
githubIssueLabel: destination-databricks
icon: databricks.svg
Expand Down
Loading

0 comments on commit 19e8a13

Please sign in to comment.