Skip to content

Commit

Permalink
Merge branch 'master' into arsenlosenko/source-facebook-marketing-for…
Browse files Browse the repository at this point in the history
…mat-date
  • Loading branch information
arsenlosenko authored Apr 27, 2023
2 parents c9b5e94 + 5ec81cc commit f4f6a74
Show file tree
Hide file tree
Showing 161 changed files with 3,500 additions and 1,020 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.44.1
current_version = 0.44.2
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ jobs:
DOCKER_HUB_PASSWORD: ${{ secrets.DOCKER_HUB_PASSWORD }}
DOCKER_HUB_USERNAME: ${{ secrets.DOCKER_HUB_USERNAME }}
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
GCS_CREDENTIALS: ${{ secrets.METADATA_SERVICE_DEV_GCS_CREDENTIALS }}
METADATA_SERVICE_ACCOUNT_KEY: ${{ secrets.METADATA_SERVICE_DEV_GCS_CREDENTIALS }}
METADATA_SERVICE_BUCKET_NAME: dev-airbyte-cloud-connector-metadata-service
GCS_CREDENTIALS: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
METADATA_SERVICE_ACCOUNT_KEY: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
METADATA_SERVICE_BUCKET_NAME: prod-airbyte-cloud-connector-metadata-service
SPEC_CACHE_BUCKET_NAME: io-airbyte-cloud-spec-cache
SPEC_CACHE_SERVICE_ACCOUNT_KEY: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY_PUBLISH }}
TEST_REPORTS_BUCKET_NAME: "airbyte-connector-build-status"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.35.4
current_version = 0.36.1
commit = False

[bumpversion:file:setup.py]
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.36.1
low-code: fix add field transformation when running from the connector builder

## 0.36.0
Emit stream status messages

## 0.35.4
low-code: remove now_local() macro because it's too unpredictable

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.35.4
RUN pip install --prefix=/install airbyte-cdk==0.36.1

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.35.4
LABEL io.airbyte.version=0.36.1
LABEL io.airbyte.name=airbyte/source-declarative-manifest
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/connector_builder/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Connector Builder Backend

This is the backend for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/).
This is the backend for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/connector-builder-ui/overview/).

## Local development

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> Manifest
component_factory=ModelToComponentFactory(
emit_connector_builder_messages=True,
limit_pages_fetched_per_slice=limits.max_pages_per_slice,
limit_slices_fetched=limits.max_slices)
limit_slices_fetched=limits.max_slices,
disable_retries=True
)
)


Expand Down
13 changes: 13 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateMessage,
AirbyteStreamStatus,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Level,
Expand All @@ -28,6 +29,7 @@
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


Expand Down Expand Up @@ -113,17 +115,24 @@ def read(
continue
try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED)
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
state_manager=state_manager,
internal_config=internal_config,
)
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE)
except AirbyteTracedException as e:
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
raise e
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
Expand Down Expand Up @@ -185,6 +194,10 @@ def _read_stream(
for record in record_iterator:
if record.type == MessageType.RECORD:
record_counter += 1
if record_counter == 1:
logger.info(f"Marking stream {stream_name} as RUNNING")
# If we just read the first record of the stream, emit the transition to the RUNNING state
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)
yield record

logger.info(f"Read {record_counter} records from {stream_name} stream")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteTraceMessage, SyncMode
from airbyte_cdk.models import AirbyteMessage, SyncMode
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, StreamSlice
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.core import Stream, StreamData


@dataclass
Expand Down Expand Up @@ -102,17 +102,27 @@ def read_records(

def _apply_transformations(
self,
message_or_record_data: Union[AirbyteMessage, AirbyteLogMessage, AirbyteTraceMessage, Mapping[str, Any]],
message_or_record_data: StreamData,
config: Config,
stream_slice: StreamSlice,
):
# If the input is an AirbyteRecord, transform the record's data
# If the input is another type of Airbyte Message, return it as is
# If the input is an AirbyteMessage with a record, transform the record's data
# If the input is another type of AirbyteMessage, return it as is
# If the input is a dict, transform it
if isinstance(message_or_record_data, AirbyteLogMessage) or isinstance(message_or_record_data, AirbyteTraceMessage):
return message_or_record_data
if isinstance(message_or_record_data, AirbyteMessage):
if message_or_record_data.record:
record = message_or_record_data.record.data
else:
return message_or_record_data
elif isinstance(message_or_record_data, dict):
record = message_or_record_data
else:
# Raise an error because this is unexpected and indicative of a typing problem in the CDK
raise ValueError(
f"Unexpected record type. Expected {StreamData}. Got {type(message_or_record_data)}. This is probably due to a bug in the CDK."
)
for transformation in self.transformations:
transformation.transform(message_or_record_data, config=config, stream_state=self.state, stream_slice=stream_slice)
transformation.transform(record, config=config, stream_state=self.state, stream_slice=stream_slice)

return message_or_record_data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,17 @@

class ModelToComponentFactory:
def __init__(
self, limit_pages_fetched_per_slice: int = None, limit_slices_fetched: int = None, emit_connector_builder_messages: bool = False
self,
limit_pages_fetched_per_slice: int = None,
limit_slices_fetched: int = None,
emit_connector_builder_messages: bool = False,
disable_retries=False,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
self._limit_slices_fetched = limit_slices_fetched
self._emit_connector_builder_messages = emit_connector_builder_messages
self._disable_retries = disable_retries

def _init_mappings(self):
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = {
Expand Down Expand Up @@ -779,6 +784,7 @@ def create_simple_retriever(
config=config,
maximum_number_of_slices=self._limit_slices_fetched,
parameters=model.parameters,
disable_retries=self._disable_retries,
)
return SimpleRetriever(
name=name,
Expand All @@ -789,6 +795,7 @@ def create_simple_retriever(
stream_slicer=stream_slicer or SinglePartitionRouter(parameters={}),
config=config,
parameters=model.parameters,
disable_retries=self._disable_retries,
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class SimpleRetriever(Retriever, HttpStream):
parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
"""

_DEFAULT_MAX_RETRY = 5

requester: Requester
record_selector: HttpSelector
config: Config
Expand All @@ -61,6 +63,7 @@ class SimpleRetriever(Retriever, HttpStream):
paginator: Optional[Paginator] = None
stream_slicer: Optional[StreamSlicer] = SinglePartitionRouter(parameters={})
emit_connector_builder_messages: bool = False
disable_retries: bool = False

def __post_init__(self, parameters: Mapping[str, Any]):
self.paginator = self.paginator or NoPagination(parameters=parameters)
Expand Down Expand Up @@ -95,6 +98,14 @@ def raise_on_http_errors(self) -> bool:
# never raise on http_errors because this overrides the error handler logic...
return False

@property
def max_retries(self) -> Union[int, None]:
if self.disable_retries:
return 0
if hasattr(self.requester.error_handler, "max_retries"):
return self.requester.error_handler.max_retries
return self._DEFAULT_MAX_RETRY

def should_retry(self, response: requests.Response) -> bool:
"""
Specifies conditions for backoff based on the response from the server.
Expand Down
8 changes: 3 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, SyncMode

# list of all possible HTTP methods which can be used for sending of request bodies
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
Expand All @@ -24,10 +24,8 @@

# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
# AirbyteRecordMessage: An AirbyteRecordMessage
# AirbyteLogMessage: A log message
# AirbyteTraceMessage: A trace message
StreamData = Union[Mapping[str, Any], AirbyteLogMessage, AirbyteTraceMessage]
# AirbyteMessage: An AirbyteMessage. Could be of any type
StreamData = Union[Mapping[str, Any], AirbyteMessage]


def package_name_from_class(cls: object) -> str:
Expand Down
36 changes: 36 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/utils/stream_status_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from datetime import datetime

from airbyte_cdk.models import (
AirbyteMessage,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
ConfiguredAirbyteStream,
StreamDescriptor,
TraceType,
)
from airbyte_cdk.models import Type as MessageType


def as_airbyte_message(stream: ConfiguredAirbyteStream, current_status: AirbyteStreamStatus) -> AirbyteMessage:
"""
Builds an AirbyteStreamStatusTraceMessage for the provided stream
"""

now_millis = datetime.now().timestamp() * 1000.0

trace_message = AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=now_millis,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name=stream.stream.name, namespace=stream.stream.namespace),
status=current_status,
),
)

return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
4 changes: 2 additions & 2 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.35.4",
version="0.36.1",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -46,7 +46,7 @@
packages=find_packages(exclude=("unit_tests",)),
package_data={"airbyte_cdk": ["py.typed", "sources/declarative/declarative_component_schema.yaml"]},
install_requires=[
"airbyte-protocol-models==1.0.0",
"airbyte-protocol-models==0.3.6",
"backoff",
"dpath~=2.0.1",
"isodate~=0.6.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ def test_create_source():
assert isinstance(source, ManifestDeclarativeSource)
assert source._constructor._limit_pages_fetched_per_slice == limits.max_pages_per_slice
assert source._constructor._limit_slices_fetched == limits.max_slices
assert source.streams(config={})[0].retriever.max_retries == 0


def request_log_message(request: dict) -> AirbyteMessage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1360,3 +1360,29 @@ def test_simple_retriever_emit_log_messages():
)

assert isinstance(retriever, SimpleRetrieverTestReadDecorator)


def test_ignore_retry():
requester_model = {
"type": "SimpleRetriever",
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "DpathExtractor",
"field_path": [],
},
},
"requester": {"type": "HttpRequester", "name": "list", "url_base": "orange.com", "path": "/v1/api"},
}

connector_builder_factory = ModelToComponentFactory(disable_retries=True)
retriever = connector_builder_factory.create_component(
model_type=SimpleRetrieverModel,
component_definition=requester_model,
config={},
name="Test",
primary_key="id",
stream_slicer=None,
)

assert retriever.max_retries == 0
Loading

0 comments on commit f4f6a74

Please sign in to comment.