Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source-Faker: bump CDK, poetry lock #48473

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
945 changes: 703 additions & 242 deletions airbyte-integrations/connectors/source-faker/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-faker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_faker"

[tool.poetry.dependencies]
python = "^3.9,<4.0"
airbyte-cdk = "^2.0"
python = "^3.10,<4.0"
airbyte-cdk = "^6.5.3"
mimesis = "==6.1.1"

[tool.poetry.scripts]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from __future__ import annotations

from types import MethodType

import orjson

from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Type,
)
from airbyte_cdk.models.airbyte_protocol_serializers import (
AirbyteMessageSerializer,
ConfiguredAirbyteStreamSerializer,
)


def msg_to_json(self: AirbyteMessage) -> str:
return orjson.dumps(self.to_dict()).decode("utf-8")


ConfiguredAirbyteStream.from_dict = ConfiguredAirbyteStreamSerializer.load
AirbyteMessage.to_dict = MethodType(AirbyteMessageSerializer.dump, AirbyteMessage)
AirbyteMessage.to_json = MethodType(msg_to_json, AirbyteMessage)


class AirbyteMessageWithCachedJSON(AirbyteMessage):
"""
I a monkeypatch to AirbyteMessage which pre-renders the JSON-representation of the object upon initialization.
This allows the JSON to be calculated in the process that builds the object rather than the main process.

Note: We can't use @cache here because the LRU cache is not serializable when passed to child workers.
"""

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._json = self.to_json()
self.json = self.from_json

def to_json(self) -> dict:
# return cached value (optimized for multiple calls with the same massage)
return self._json


__all__ = [
"AirbyteMessage",
"AirbyteRecordMessage",
"AirbyteMessageWithCachedJSON",
"ConfiguredAirbyteCatalog",
"ConfiguredAirbyteStream",
"Type",
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@

import datetime
from multiprocessing import current_process
from typing import Dict, List

from airbyte_cdk.models import AirbyteRecordMessage, Type
from mimesis import Datetime, Numeric

from .airbyte_message_with_cached_json import AirbyteMessageWithCachedJSON
from .utils import format_airbyte_time, now_millis
from source_faker.models import AirbyteMessageWithCachedJSON
from source_faker.models import AirbyteRecordMessage, Type
from source_faker.utils import format_airbyte_time, now_millis


class PurchaseGenerator:
def __init__(self, stream_name: str, seed: int) -> None:
self.stream_name = stream_name
self.seed = seed

def prepare(self):
def prepare(self) -> None:
"""
Note: the instances of the mimesis generators need to be global.
Yes, they *should* be able to be instance variables on this class, which should only instantiated once-per-worker, but that's not quite the case:
Expand All @@ -37,7 +36,9 @@ def prepare(self):
numeric = Numeric(seed=seed_with_offset)

def random_date_in_range(
self, start_date: datetime.datetime, end_date: datetime.datetime = datetime.datetime.now()
self,
start_date: datetime.datetime,
end_date: datetime.datetime = datetime.datetime.now(),
) -> datetime.datetime:
time_between_dates = end_date - start_date
days_between_dates = time_between_dates.days
Expand All @@ -47,13 +48,12 @@ def random_date_in_range(
random_date = start_date + datetime.timedelta(days=random_number_of_days)
return random_date

def generate(self, user_id: int) -> List[Dict]:
def generate(self, user_id: int) -> list[dict]:
"""
Because we are doing this work in parallel processes, we need a deterministic way to know what a purchase's ID should be given on the input of a user_id.
tldr; Every 10 user_ids produce 10 purchases. User ID x5 has no purchases, User ID mod x7 has 2, and everyone else has 1
"""

purchases: List[Dict] = []
purchases: list[dict] = []
last_user_id_digit = int(repr(user_id)[-1])
purchase_count = 1
id_offset = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import logging
from typing import Any, List, Mapping, Tuple
from typing import Any, Mapping

from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
Expand All @@ -14,13 +14,17 @@


class SourceFaker(AbstractSource):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
def check_connection(
self,
logger: logging.Logger,
config: Mapping[str, Any],
) -> tuple[bool, Any]:
if type(config["count"]) == int or type(config["count"]) == float:
return True, None
else:
return False, "Count option is missing"

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> list[Stream]:
count: int = config["count"] if "count" in config else DEFAULT_COUNT
seed: int = config["seed"] if "seed" in config else None
records_per_slice: int = config["records_per_slice"] if "records_per_slice" in config else 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import datetime
import os
from collections.abc import Iterable, Mapping
from multiprocessing import Pool
from typing import Any, Dict, Iterable, List, Mapping, Optional
from typing import Any, Optional

from airbyte_cdk.sources.streams import IncrementalMixin, Stream

Expand All @@ -18,7 +19,15 @@ class Products(Stream, IncrementalMixin):
primary_key = "id"
cursor_field = "updated_at"

def __init__(self, count: int, seed: int, parallelism: int, records_per_slice: int, always_updated: bool, **kwargs):
def __init__(
self,
count: int,
seed: int,
parallelism: int,
records_per_slice: int,
always_updated: bool,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.count = count
self.seed = seed
Expand All @@ -37,10 +46,10 @@ def state(self) -> Mapping[str, Any]:
return {}

@state.setter
def state(self, value: Mapping[str, Any]):
def state(self, value: Mapping[str, Any]) -> None:
self._state = value

def load_products(self) -> List[Dict]:
def load_products(self) -> list[dict]:
dirname = os.path.dirname(os.path.realpath(__file__))
return read_json(os.path.join(dirname, "record_data", "products.json"))

Expand Down Expand Up @@ -68,7 +77,15 @@ class Users(Stream, IncrementalMixin):
primary_key = "id"
cursor_field = "updated_at"

def __init__(self, count: int, seed: int, parallelism: int, records_per_slice: int, always_updated: bool, **kwargs):
def __init__(
self,
count: int,
seed: int,
parallelism: int,
records_per_slice: int,
always_updated: bool,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.count = count
self.seed = seed
Expand All @@ -89,7 +106,7 @@ def state(self) -> Mapping[str, Any]:
return {}

@state.setter
def state(self, value: Mapping[str, Any]):
def state(self, value: Mapping[str, Any]) -> None:
self._state = value

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
Expand Down Expand Up @@ -128,7 +145,15 @@ class Purchases(Stream, IncrementalMixin):
primary_key = "id"
cursor_field = "updated_at"

def __init__(self, count: int, seed: int, parallelism: int, records_per_slice: int, always_updated: bool, **kwargs):
def __init__(
self,
count: int,
seed: int,
parallelism: int,
records_per_slice: int,
always_updated: bool,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.count = count
self.seed = seed
Expand All @@ -149,7 +174,7 @@ def state(self) -> Mapping[str, Any]:
return {}

@state.setter
def state(self, value: Mapping[str, Any]):
def state(self, value: Mapping[str, Any]) -> None:
self._state = value

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
Expand All @@ -165,13 +190,20 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:

# a fuzzy guess, some users have purchases, some don't
median_record_byte_size = 230
yield generate_estimate(self.name, (self.count) * 1.3, median_record_byte_size)
yield generate_estimate(
self.name,
(self.count) * 1.3,
median_record_byte_size,
)

loop_offset = 0
with Pool(initializer=self.generator.prepare, processes=self.parallelism) as pool:
while loop_offset < self.count:
records_remaining_this_loop = min(self.records_per_slice, (self.count - loop_offset))
carts = pool.map(self.generator.generate, range(loop_offset, loop_offset + records_remaining_this_loop))
carts = pool.map(
self.generator.generate,
range(loop_offset, loop_offset + records_remaining_this_loop),
)
for purchases in carts:
loop_offset += 1
for purchase in purchases:
Expand All @@ -180,6 +212,14 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
if records_remaining_this_loop == 0:
break

self.state = {"seed": self.seed, "updated_at": updated_at, "loop_offset": loop_offset}

self.state = {"seed": self.seed, "updated_at": updated_at, "loop_offset": loop_offset}
self.state = {
"seed": self.seed,
"updated_at": updated_at,
"loop_offset": loop_offset,
}

self.state = {
"seed": self.seed,
"updated_at": updated_at,
"loop_offset": loop_offset,
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@
import datetime
from multiprocessing import current_process

from airbyte_cdk.models import AirbyteRecordMessage, Type
from mimesis import Address, Datetime, Person
from mimesis.locales import Locale

from .airbyte_message_with_cached_json import AirbyteMessageWithCachedJSON
from .utils import format_airbyte_time, now_millis
from airbyte_cdk.models import AirbyteRecordMessage, Type
from source_faker.models import AirbyteMessageWithCachedJSON
from source_faker.utils import format_airbyte_time, now_millis


class UserGenerator:
def __init__(self, stream_name: str, seed: int) -> None:
self.stream_name = stream_name
self.seed = seed

def prepare(self):
def prepare(self) -> None:
"""
Note: the instances of the mimesis generators need to be global.
Yes, they *should* be able to be instance variables on this class, which should only instantiated once-per-worker, but that's not quite the case:
Expand All @@ -38,7 +38,7 @@ def prepare(self):
address = Address(locale=Locale.EN, seed=seed_with_offset)
dt = Datetime(seed=seed_with_offset)

def generate(self, user_id: int):
def generate(self, user_id: int) -> AirbyteMessageWithCachedJSON:
# faker doesn't always produce unique email addresses, so to enforce uniqueness, we will append the user_id to the prefix
email_parts = person.email().split("@")
email = f"{email_parts[0]}+{user_id + 1}@{email_parts[1]}"
Expand Down Expand Up @@ -74,5 +74,12 @@ def generate(self, user_id: int):
while not profile["created_at"]:
profile["created_at"] = format_airbyte_time(dt.datetime())

record = AirbyteRecordMessage(stream=self.stream_name, data=profile, emitted_at=now_millis())
return AirbyteMessageWithCachedJSON(type=Type.RECORD, record=record)
record = AirbyteRecordMessage(
stream=self.stream_name,
data=profile,
emitted_at=now_millis(),
)
return AirbyteMessageWithCachedJSON(
type=Type.RECORD,
record=record,
)
Loading
Loading