Skip to content

[Concurrent Low-Code] Allow low-code streams using AsyncRetriever to be run within the Concurrent CDK #168

Closed
@brianjlai

Description

@brianjlai

Context

After allowing certain types of low-code streams to be processed within the concurrent framework, we ran into an issue where stream that used the AsyncRetriever component would result in errors during processing. One such example is

{
  "type": "TRACE",
  "trace": {
    "type": "ERROR",
    "emitted_at": 1733873724427,
    "error": {
      "message": "Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
      "internal_message": "AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
      "stack_trace": "Traceback (most recent call last):\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/streams/concurrent/partition_reader.py\", line 40, in process_partition\n    for record in partition.read():\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py\", line 59, in read\n    for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/retrievers/async_retriever.py\", line 119, in read_records\n    records: Iterable[Mapping[str, Any]] = self._job_orchestrator.fetch_records(partition)\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/retrievers/async_retriever.py\", line 60, in _job_orchestrator\n    raise AirbyteTracedException(\nairbyte_cdk.utils.traced_exception.AirbyteTracedException: AsyncPartitionRepository is expected to be accessed only after `stream_slices`\n",
      "failure_type": "system_error",
      "stream_descriptor": {
        "name": "contacts"
      }
    }
  }
}

As a temporary fix, we changed the concurrent_declarative_source.py to only run streams using the SimpleRetriever concurrently.

Slack thread with more context:
https://airbytehq-team.slack.com/archives/C063B9A434H/p1733264441227669?thread_ts=1733257895.550789&cid=C063B9A434H

Problem / Solution

The root of the issue is two fold

Issue 1:

Within the concurrent_declarative_source.py, when we instantiate our StreamSlicerPartitionGenerator, we pass in the declarative_stream.retriever.stream_slicer. However, this will not work for an AsyncRetriever because in this scenario the stream_slicer corresponds to an underlying partition router of the AsyncRetriever which is used to supply partitions when creating async jobs. And in our current implementation, the AsyncRetriever is responsible for generating slices within stream_slices() instead of delegating to the stream_slicer unlike our existing SimpleRetriever.

For example in simple_retriever.py:

def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
    return self.stream_slicer.stream_slices()

In async_retriever.py:

def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
    slices = self.stream_slicer.stream_slices()
    self.__job_orchestrator = self._job_orchestrator_factory(slices)

    for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
        yield StreamSlice(
            partition=dict(completed_partition.stream_slice.partition)
            | {"partition": completed_partition},
            cursor_slice=completed_partition.stream_slice.cursor_slice,
        )

What we should do is:

  • Create a new StreamSlicer low-code component called AsyncJobStreamSlicer that adheres to the StreamSlicer interface
  • The implementation of stream_slices() should be the current implementation shown above. It should be instantiated with a parent stream slicer
  • AsyncRequester should have a stream slicer defined as a field and use it when stream_slices() is called
  • Within the model_to_component_factory, we instantiate the new async stream_slicer
  • There should be no impact to the low-code interface

Issue 2:

We are instantiating a new AsyncRetriever every time we create a new partition because the factory method we supply to the StreamSlicerPartitionGenerator instantiates new instances of the declarative stream + retriever. That in turn leads to the partition invoking AsyncRetriever.read_records() on a new instance of the async retriever which has not been instantiated properly. This is because the AsyncRetriever.stream_slices() is not stateless and responsible for setting it up properly to then be called during read records. Also, because we instantiate individual retrievers, they aren't using a shared AsyncJobOrchestrator or AsyncJobRepository which is needed to properly manage the internal state of the retriever.

As evidenced by this error:

{
  "type": "TRACE",
  "trace": {
    "type": "ERROR",
    "emitted_at": 1733873724427,
    "error": {
      "message": "Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
      "internal_message": "AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
      "stack_trace": "Traceback (most recent call last):\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/streams/concurrent/partition_reader.py\", line 40, in process_partition\n    for record in partition.read():\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py\", line 59, in read\n    for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/retrievers/async_retriever.py\", line 119, in read_records\n    records: Iterable[Mapping[str, Any]] = self._job_orchestrator.fetch_records(partition)\n  File \"/Users/brian.lai/dev/airbyte-python-cdk/airbyte_cdk/sources/declarative/retrievers/async_retriever.py\", line 60, in _job_orchestrator\n    raise AirbyteTracedException(\nairbyte_cdk.utils.traced_exception.AirbyteTracedException: AsyncPartitionRepository is expected to be accessed only after `stream_slices`\n",
      "failure_type": "system_error",
      "stream_descriptor": {
        "name": "contacts"
      }
    }
  }
}

What we want to actually do is reuse the same AsyncRetriever on each partition. This would allow us to use a properly instantiated AsyncRetriever which has already called stream_slices(). From within concurrent_declarative_source.py, we can instead just pass the original AsyncRetriever instance which has the proper state as well as the shared orchestrators and job repository.

However, there is one major problem with this approach and that is that there are potential ways that the AsyncRetriever is not thread safe. The biggest one being that the DefaultPaginator relies on an internal state. The _token field is overwritten each time we read a page. If we have multiple partitions using the same AsyncRetriever, we can possibly lose records between partitions. If we make this thread safe, we should be able to share the same retriever.

Note:

There are potentially other places where we may not be thread safe. However, we can do some additional analysis about areas that are not. However, rather than drag work out and try to fix everything, we are going to make a calculated bet that the impact on async retriever + thread safety is relatively low blast radius. We may at some point have to revisit making all of our low-code components thread safe in the future.

Acceptance Criteria

  • [ ]
  • There should be no breaking changes to the low-code interface

Metadata

Metadata

Assignees

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions