Skip to content

Commit

Permalink
Merge branch 'master' into arsenlosenko/source-facebook-marketing-for…
Browse files Browse the repository at this point in the history
…mat-date
  • Loading branch information
arsenlosenko authored Apr 7, 2023
2 parents ed4366c + 221676c commit 41ae131
Show file tree
Hide file tree
Showing 581 changed files with 31,085 additions and 1,892 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.43.0
current_version = 0.43.1
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


### SHARED ###
VERSION=0.43.0
VERSION=0.43.1

# When using the airbyte-db via default docker image
CONFIG_ROOT=/data
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/run-dagger-pipeline/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ runs:
python-version: "3.10"
- name: Install ci-connector-ops package
shell: bash
run: pip install -e ./tools/ci_connector_ops\[pipelines]\
run: pip install --quiet -e ./tools/ci_connector_ops\[pipelines]\
- name: Run airbyte-ci
shell: bash
run: |
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/deploy-oss-catalog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches:
- master
paths:
- airbyte-config/init/src/main/resources/seed/**
- airbyte-config-oss/init-oss/src/main/resources/seed/**

workflow_dispatch:

Expand All @@ -29,12 +29,12 @@ jobs:
distribution: "zulu"
java-version: "17"
- name: Generate catalog
run: SUB_BUILD=ALL_CONNECTORS ./gradlew :airbyte-config:specs:generateOssConnectorCatalog
run: SUB_BUILD=ALL_CONNECTORS ./gradlew :airbyte-config-oss:specs-oss:generateOssConnectorCatalog
- name: Upload catalog to GCS
shell: bash
run: |
gcs_bucket_name="prod-airbyte-cloud-connector-metadata-service"
catalog_path="airbyte-config/init/src/main/resources/seed/oss_catalog.json"
catalog_path="airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json"
gsutil -h "Cache-Control:public, max-age=10" cp "$catalog_path" "gs://$gcs_bucket_name/oss_catalog.json"
- name: Trigger Cloud catalog generation
uses: peter-evans/repository-dispatch@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
- 'tools/**'
- '*.gradle'
- 'deps.toml'
- 'airbyte-config/**'
- 'airbyte-config-oss/**'
cdk:
- 'airbyte-cdk/**'
cli:
Expand Down Expand Up @@ -327,7 +327,7 @@ jobs:
- name: Process Resources
uses: Wandalen/wretry.action@master
with:
command: SUB_BUILD=CONNECTORS_BASE ./gradlew :airbyte-config:init:processResources --scan
command: SUB_BUILD=CONNECTORS_BASE ./gradlew :airbyte-config-oss:init-oss:processResources --scan
attempt_limit: 3
attempt_delay: 5000 # in ms

Expand Down
23 changes: 23 additions & 0 deletions .github/workflows/metadata_orchestrator_tests_dagger.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Metadata Service CI

on:
workflow_dispatch:
pull_request:
paths:
- "airbyte-ci/connectors/metadata_service/**"
jobs:
metadata_service_ci:
name: Metadata Orchestrator CI
timeout-minutes: 240 # 4 hours
runs-on: medium-runner
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Run Metadata unit test pipeline
id: metadata-orchestrator-unit-test-pipeline
uses: ./.github/actions/run-dagger-pipeline
with:
subcommand: "metadata test orchestrator"
context: "pull_request"
env:
CI_GITHUB_ACCESS_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
2 changes: 1 addition & 1 deletion .github/workflows/metadata_service_tests_dagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Run Metadata unit test pipeline
id: pipelinerun
id: metadata-lib-unit-test-pipeline
uses: ./.github/actions/run-dagger-pipeline
with:
subcommand: "metadata test lib"
Expand Down
24 changes: 24 additions & 0 deletions .github/workflows/metadata_validate_manifest_dagger.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Metadata Service CI

on:
workflow_dispatch:
pull_request:
# TODO (ben): Change to any connector change once the metadata service is in use
paths:
- "airbyte-integrations/connectors/**/metadata.yml"
jobs:
metadata_service_ci:
name: Validate Metadata Manifest for Connectors
timeout-minutes: 10 # 10 minutes
runs-on: medium-runner
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Run Metadata unit test pipeline
id: metadata-lib-unit-test-pipeline
uses: ./.github/actions/run-dagger-pipeline
with:
subcommand: "metadata validate"
context: "pull_request"
env:
CI_GITHUB_ACCESS_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
6 changes: 3 additions & 3 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ jobs:
if: github.event.inputs.auto-bump-version == 'true' && success()
run: |
connector="airbyte/${{ env.IMAGE_NAME }}"
definitionpath=./airbyte-config/init/src/main/resources/seed/
definitionpath=./airbyte-config-oss/init-oss/src/main/resources/seed/
sourcecheck=$(yq e ".. | select(has(\"dockerRepository\")) | select(.dockerRepository == \"$connector\")" "$definitionpath"source_definitions.yaml)
destcheck=$(yq e ".. | select(has(\"dockerRepository\")) | select(.dockerRepository == \"$connector\")" "$definitionpath"destination_definitions.yaml)
if [[ (-z "$sourcecheck" && -z "$destcheck") ]]
Expand All @@ -344,7 +344,7 @@ jobs:
if: github.event.inputs.auto-bump-version == 'true' && success()
run: |
connector="airbyte/${{ env.IMAGE_NAME }}"
definitionpath=./airbyte-config/init/src/main/resources/seed/
definitionpath=./airbyte-config-oss/init-oss/src/main/resources/seed/
sourcename=$(yq e ".[] | select(has(\"dockerRepository\")) | select(.dockerRepository == \"$connector\") | .name" "$definitionpath"source_definitions.yaml)
destname=$(yq e ".[] | select(has(\"dockerRepository\")) | select(.dockerRepository == \"$connector\") | .name" "$definitionpath"destination_definitions.yaml)
if [ -z "$sourcename" ]
Expand All @@ -355,7 +355,7 @@ jobs:
if: github.event.inputs.auto-bump-version == 'true' && success()
uses: Wandalen/wretry.action@master
with:
command: ./gradlew :airbyte-config:init:processResources
command: ./gradlew :airbyte-config-oss:init-oss:processResources
attempt_limit: 3
attempt_delay: 5000 # in ms
- name: git config
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ docs/SUMMARY.md
**/specs_secrets_mask.yaml

# Files generated for uploading to GCS
airbyte-config/**/resources/seed/oss_catalog.json
airbyte-config-oss/**/resources/seed/oss_catalog.json

# Output Files generated by scripts
lowcode_connector_names.txt
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.33.2
current_version = 0.34.0
commit = False

[bumpversion:file:setup.py]
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.34.0
Low-Code CDK: Enable use of SingleUseRefreshTokenAuthenticator

## 0.33.2
low-code: fix duplicate stream slicer update

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.33.2
RUN pip install --prefix=/install airbyte-cdk==0.34.0

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.33.2
LABEL io.airbyte.version=0.34.0
LABEL io.airbyte.name=airbyte/source-declarative-manifest
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/connector_builder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Note:
{
"config": <normal config>,
"__injected_declarative_manifest": {...},
"__command": <"resolve_manifest" | "list_streams" | "stream_read">
"__command": <"resolve_manifest" | "list_streams" | "test_read">
}
```
*See [ConnectionSpecification](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#actor-specification) for details on the `"config"` key if needed.
Expand Down
5 changes: 4 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource:
manifest = config.get("__injected_declarative_manifest")
return ManifestDeclarativeSource(manifest, True)
return ManifestDeclarativeSource(
source_config=manifest, component_factory=ModelToComponentFactory(emit_connector_builder_messages=True)
)


def get_config_and_catalog_from_args(args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,13 @@


class ModelToComponentFactory:
def __init__(self, limit_pages_fetched_per_slice: int = None, limit_slices_fetched: int = None):
def __init__(
self, limit_pages_fetched_per_slice: int = None, limit_slices_fetched: int = None, emit_connector_builder_messages: bool = False
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
self._limit_slices_fetched = limit_slices_fetched
self._emit_connector_builder_messages = emit_connector_builder_messages

def _init_mappings(self):
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = {
Expand Down Expand Up @@ -759,7 +762,7 @@ def create_simple_retriever(
else NoPagination(parameters={})
)

if self._limit_slices_fetched:
if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
paginator=paginator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import json
import logging
from dataclasses import InitVar, dataclass, field
from itertools import islice
from json import JSONDecodeError
Expand Down Expand Up @@ -61,6 +60,7 @@ class SimpleRetriever(Retriever, HttpStream):
_primary_key: str = field(init=False, repr=False, default="")
paginator: Optional[Paginator] = None
stream_slicer: Optional[StreamSlicer] = SinglePartitionRouter(parameters={})
emit_connector_builder_messages: bool = False

def __post_init__(self, parameters: Mapping[str, Any]):
self.paginator = self.paginator or NoPagination(parameters=parameters)
Expand Down Expand Up @@ -368,7 +368,7 @@ def read_records(
stream_slice = stream_slice or {} # None-check
self.paginator.reset()
records_generator = self._read_pages(
self._parse_records_and_emit_request_and_responses,
self.parse_records,
stream_slice,
stream_state,
)
Expand Down Expand Up @@ -408,13 +408,13 @@ def state(self, value: StreamState):
"""State setter, accept state serialized by state getter."""
self.stream_slicer.update_cursor(value)

def _parse_records_and_emit_request_and_responses(self, request, response, stream_state, stream_slice) -> Iterable[StreamData]:
# Only emit requests and responses when running in debug mode
if self.logger.isEnabledFor(logging.DEBUG):
yield _prepared_request_to_airbyte_message(request)
yield _response_to_airbyte_message(response)
# Not great to need to call _read_pages which is a private method
# A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester
def parse_records(
self,
request: requests.PreparedRequest,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any],
) -> Iterable[StreamData]:
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state)


Expand All @@ -439,6 +439,17 @@ def stream_slices(
) -> Iterable[Optional[Mapping[str, Any]]]:
return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self.maximum_number_of_slices)

def parse_records(
self,
request: requests.PreparedRequest,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any],
) -> Iterable[StreamData]:
yield _prepared_request_to_airbyte_message(request)
yield _response_to_airbyte_message(response)
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state)


def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage:
# FIXME: this should return some sort of trace message
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.33.2",
version="0.34.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airbyte_cdk.sources.declarative.models import ListPartitionRouter as ListPartitionRouterModel
from airbyte_cdk.sources.declarative.models import OAuthAuthenticator as OAuthAuthenticatorModel
from airbyte_cdk.sources.declarative.models import RecordSelector as RecordSelectorModel
from airbyte_cdk.sources.declarative.models import SimpleRetriever as SimpleRetrieverModel
from airbyte_cdk.sources.declarative.models import Spec as SpecModel
from airbyte_cdk.sources.declarative.models import SubstreamPartitionRouter as SubstreamPartitionRouterModel
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ManifestComponentTransformer
Expand All @@ -46,7 +47,7 @@
from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, SimpleRetrieverTestReadDecorator
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer
Expand Down Expand Up @@ -1296,3 +1297,29 @@ def test_merge_incremental_and_partition_router(incremental, partition_router, e
if expected_slicer_count > 1:
assert isinstance(stream.retriever.stream_slicer, CartesianProductStreamSlicer)
assert len(stream.retriever.stream_slicer.stream_slicers) == expected_slicer_count


def test_simple_retriever_emit_log_messages():
simple_retriever_model = {
"type": "SimpleRetriever",
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "DpathExtractor",
"field_path": [],
},
},
"requester": {"type": "HttpRequester", "name": "list", "url_base": "orange.com", "path": "/v1/api"},
}

connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True)
retriever = connector_builder_factory.create_component(
model_type=SimpleRetrieverModel,
component_definition=simple_retriever_model,
config={},
name="Test",
primary_key="id",
stream_slicer=None,
)

assert isinstance(retriever, SimpleRetrieverTestReadDecorator)
Loading

0 comments on commit 41ae131

Please sign in to comment.