Skip to content

Commit

Permalink
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
Browse files Browse the repository at this point in the history
…ract-source-definition-specification-api
  • Loading branch information
benmoriceau committed Nov 8, 2022
2 parents 1cf081b + 1cdf1ba commit 7bb500b
Show file tree
Hide file tree
Showing 674 changed files with 20,471 additions and 5,955 deletions.
3 changes: 3 additions & 0 deletions .github/teams.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
team/growth:
- "@letiescanciano"
- "@arnaudjnn"
9 changes: 9 additions & 0 deletions .github/workflows/label-pr-by-team.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: "Add labels to github PRs based on team"
on: pull_request
jobs:
team-labeler:
runs-on: ubuntu-latest
steps:
- uses: JulienKode/team-labeler-action@v0.1.1
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
66 changes: 66 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,29 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/workspaces/get_by_connection_id:
post:
tags:
- workspace
summary: Find workspace by connection id
operationId: getWorkspaceByConnectionId
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/WorkspaceRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/workspaces/update:
post:
tags:
Expand Down Expand Up @@ -2266,6 +2289,26 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"
/v1/attempt/save_stats:
post:
tags:
- attempt
- internal
summary: For worker to set sync stats of a running attempt.
operationId: saveStats
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SaveStatsRequestBody"
required: true
responses:
"200":
description: Successful Operation
content:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"

components:
securitySchemes:
Expand Down Expand Up @@ -4049,6 +4092,12 @@ components:
recordsCommitted:
type: integer
format: int64
estimatedRecords:
type: integer
format: int64
estimatedBytes:
type: integer
format: int64
AttemptStreamStats:
type: object
required:
Expand Down Expand Up @@ -4892,6 +4941,23 @@ components:
processingTaskQueue:
type: string
default: ""
SaveStatsRequestBody:
type: object
required:
- jobId
- attemptNumber
- stats
properties:
jobId:
$ref: "#/components/schemas/JobId"
attemptNumber:
$ref: "#/components/schemas/AttemptNumber"
stats:
$ref: "#/components/schemas/AttemptStats"
streamStats:
type: array
items:
$ref: "#/components/schemas/AttemptStreamStats"
InternalOperationResult:
type: object
required:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-base-java-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ WORKDIR /app

RUN yum install -y tar

# Add the DataDaog Java APM agent
# Add the Datadog Java APM agent
ADD https://dtdg.co/latest-java-tracer dd-java-agent.jar

# Add the OpenTelemetry Java APM agent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ public BootloaderApp(final Configs configs,
public void load() throws Exception {
LOGGER.info("Initializing databases...");
DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext,
configs.getConfigsDatabaseInitializationTimeoutMs(), MoreResources.readResource(DatabaseConstants.CONFIGS_SCHEMA_PATH)).initialize();
configs.getConfigsDatabaseInitializationTimeoutMs(), MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)).initialize();

DatabaseCheckFactory.createJobsDatabaseInitializer(jobsDslContext,
configs.getJobsDatabaseInitializationTimeoutMs(), MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH)).initialize();
configs.getJobsDatabaseInitializationTimeoutMs(), MoreResources.readResource(DatabaseConstants.JOBS_INITIAL_SCHEMA_PATH)).initialize();
LOGGER.info("Databases initialized.");

LOGGER.info("Setting up config database and default workspace...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void testBootloaderAppBlankDb() throws Exception {
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
assertEquals("0.40.18.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.18.002", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
Expand Down
13 changes: 13 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Changelog

## 0.7.0
Low-code: Allow connector specifications to be defined in the manifest

## 0.6.0
Low-code: Add support for monthly and yearly incremental updates for `DatetimeStreamSlicer`

## 0.5.4
Low-code: Get response.json in a safe way

## 0.5.3
Low-code: Replace EmptySchemaLoader with DefaultSchemaLoader to retain backwards compatibility
Low-code: Evaluate backoff strategies at runtime

## 0.5.2
Low-code: Allow for read even when schemas are not defined for a connector yet

Expand Down
27 changes: 27 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Config:

class TraceType(Enum):
ERROR = "ERROR"
ESTIMATE = "ESTIMATE"


class FailureType(Enum):
Expand All @@ -98,6 +99,28 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class EstimateType(Enum):
STREAM = "STREAM"
SYNC = "SYNC"


class AirbyteEstimateTraceMessage(BaseModel):
class Config:
extra = Extra.allow

name: str = Field(..., description="The name of the stream")
type: EstimateType = Field(..., description="The type of estimate", title="estimate type")
namespace: Optional[str] = Field(None, description="The namespace of the stream")
row_estimate: Optional[int] = Field(
None,
description="The estimated number of rows to be emitted by this sync for this stream",
)
byte_estimate: Optional[int] = Field(
None,
description="The estimated number of bytes to be emitted by this sync for this stream",
)


class OrchestratorType(Enum):
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"

Expand Down Expand Up @@ -213,6 +236,10 @@ class Config:
type: TraceType = Field(..., description="the type of trace message", title="trace type")
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")
estimate: Optional[AirbyteEstimateTraceMessage] = Field(
None,
description="Estimate trace message: a guess at how much data will be produced in this sync",
)


class AirbyteControlMessage(BaseModel):
Expand Down
80 changes: 33 additions & 47 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

import logging
from abc import ABC, abstractmethod
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union

from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Expand All @@ -24,8 +21,8 @@
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.transform import TypeTransformer
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

Expand Down Expand Up @@ -241,20 +238,27 @@ def _read_incremental(
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)
for record_counter, record_data in enumerate(records, start=1):
yield self._as_airbyte_record(stream_name, record_data)
stream_state = stream_instance.get_updated_state(stream_state, record_data)
checkpoint_interval = stream_instance.state_checkpoint_interval
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_instance, stream_state, state_manager)

total_records_counter += 1
# This functionality should ideally live outside of this method
# but since state is managed inside this method, we keep track
# of it here.
if self._limit_reached(internal_config, total_records_counter):
# Break from slice loop to save state and exit from _read_incremental function.
break
record_counter = 0
for message_counter, record_data_or_message in enumerate(records, start=1):
message = stream_data_to_airbyte_message(
stream_name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
)
yield message
if message.type == MessageType.RECORD:
record = message.record
stream_state = stream_instance.get_updated_state(stream_state, record.data)
checkpoint_interval = stream_instance.state_checkpoint_interval
record_counter += 1
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_instance, stream_state, state_manager)

total_records_counter += 1
# This functionality should ideally live outside of this method
# but since state is managed inside this method, we keep track
# of it here.
if self._limit_reached(internal_config, total_records_counter):
# Break from slice loop to save state and exit from _read_incremental function.
break

yield self._checkpoint_state(stream_instance, stream_state, state_manager)
if self._limit_reached(internal_config, total_records_counter):
Expand All @@ -277,50 +281,32 @@ def _read_full_refresh(
total_records_counter = 0
for _slice in slices:
logger.debug("Processing stream slice", extra={"slice": _slice})
records = stream_instance.read_records(
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
for record in records:
yield self._as_airbyte_record(configured_stream.stream.name, record)
total_records_counter += 1
if self._limit_reached(internal_config, total_records_counter):
return
for record_data_or_message in record_data_or_messages:
message = stream_data_to_airbyte_message(
stream_instance.name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if self._limit_reached(internal_config, total_records_counter):
return

def _checkpoint_state(self, stream: Stream, stream_state, state_manager: ConnectorStateManager):
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state
# property is not implemented by the stream instance and as a fallback, use the stream_state retrieved from the stream
# instance's deprecated get_updated_state() method.
try:
state_manager.update_state_for_stream(stream.name, stream.namespace, stream.state)

except AttributeError:
state_manager.update_state_for_stream(stream.name, stream.namespace, stream_state)
return state_manager.create_state_message(stream.name, stream.namespace, send_per_stream_state=self.per_stream_state_enabled)

@lru_cache(maxsize=None)
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, Mapping[str, Any]]:
"""
Lookup stream's transform object and jsonschema based on stream name.
This function would be called a lot so using caching to save on costly
get_json_schema operation.
:param stream_name name of stream from catalog.
:return tuple with stream transformer object and discover json schema.
"""
stream_instance = self._stream_to_instance_map[stream_name]
return stream_instance.transformer, stream_instance.get_json_schema()

def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
now_millis = int(datetime.now().timestamp() * 1000)
transformer, schema = self._get_stream_transformer_and_schema(stream_name)
# Transform object fields according to config. Most likely you will
# need it to normalize values against json schema. By default no action
# taken unless configured. See
# docs/connector-development/cdk-python/schemas.md for details.
transformer.transform(data, schema) # type: ignore
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
return AirbyteMessage(type=MessageType.RECORD, record=message)

@staticmethod
def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: Stream):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, StreamSlice
Expand Down Expand Up @@ -48,7 +48,7 @@ class DeclarativeStream(Stream, JsonSchemaMixin):
def __post_init__(self, options: Mapping[str, Any]):
self.stream_cursor_field = self.stream_cursor_field or []
self.transformations = self.transformations or []
self._schema_loader = self.schema_loader if self.schema_loader else EmptySchemaLoader(config=self.config, options=options)
self._schema_loader = self.schema_loader if self.schema_loader else DefaultSchemaLoader(config=self.config, options=options)

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ class JsonDecoder(Decoder, JsonSchemaMixin):
options: InitVar[Mapping[str, Any]]

def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List]:
return response.json() or {}
try:
return response.json()
except requests.exceptions.JSONDecodeError:
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
Expand All @@ -59,7 +59,6 @@
"DefaultErrorHandler": DefaultErrorHandler,
"DefaultPaginator": DefaultPaginator,
"DpathExtractor": DpathExtractor,
"EmptySchemaLoader": EmptySchemaLoader,
"ExponentialBackoffStrategy": ExponentialBackoffStrategy,
"HttpRequester": HttpRequester,
"InterpolatedBoolean": InterpolatedBoolean,
Expand All @@ -77,6 +76,7 @@
"RemoveFields": RemoveFields,
"SimpleRetriever": SimpleRetriever,
"SingleSlice": SingleSlice,
"Spec": Spec,
"SubstreamSlicer": SubstreamSlicer,
"WaitUntilTimeFromHeader": WaitUntilTimeFromHeaderBackoffStrategy,
"WaitTimeFromHeader": WaitTimeFromHeaderBackoffStrategy,
Expand Down
Loading

0 comments on commit 7bb500b

Please sign in to comment.