Skip to content

Commit

Permalink
Output run time per stream (airbytehq#8173)
Browse files Browse the repository at this point in the history
* Output run time per stream

* Code review changes: Remove multithreaded support.
Port code to cdk

* Formatting

* remove extraneous try block
  • Loading branch information
eliziario authored Nov 30, 2021
1 parent ff0c09a commit 65d471a
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 27 deletions.
41 changes: 23 additions & 18 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.transform import TypeTransformer
from airbyte_cdk.utils.event_timing import create_timer


class AbstractSource(Source, ABC):
Expand Down Expand Up @@ -90,24 +91,28 @@ def read(
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
self._stream_to_instance_map = stream_instances
for configured_stream in catalog.streams:
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
)

try:
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
connector_state=connector_state,
internal_config=internal_config,
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
raise e
with create_timer(self.name) as timer:
for configured_stream in catalog.streams:
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
)

try:
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
connector_state=connector_state,
internal_config=internal_config,
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
raise e
finally:
logger.info(f"Finished syncing {self.name}")
logger.info(timer.report())

logger.info(f"Finished syncing {self.name}")

Expand Down
85 changes: 85 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/utils/event_timing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import datetime
import time
from contextlib import contextmanager
from dataclasses import dataclass, field

from airbyte_cdk.logger import AirbyteLogger

logger = AirbyteLogger()


class EventTimer:
"""Simple nanosecond resolution event timer for debugging, initially intended to be used to record streams execution
time for a source.
Event nesting follows a LIFO pattern, so finish will apply to the last started event.
"""

def __init__(self, name):
self.name = name
self.events = {}
self.count = 0
self.stack = []

def start_event(self, name):
"""
Start a new event and push it to the stack.
"""
self.events[name] = Event(name=name)
self.count += 1
self.stack.insert(0, self.events[name])

def finish_event(self):
"""
Finish the current event and pop it from the stack.
"""

if self.stack:
event = self.stack.pop(0)
event.finish()
else:
logger.warn(f"{self.name} finish_event called without start_event")

def report(self, order_by="name"):
"""
:param order_by: 'name' or 'duration'
"""
if order_by == "name":
events = sorted(self.events.values(), key=lambda event: event.name)
elif order_by == "duration":
events = sorted(self.events.values(), key=lambda event: event.duration)
text = f"{self.name} runtimes:\n"
text += "\n".join(str(event) for event in events)
return text


@dataclass
class Event:
name: str
start: float = field(default_factory=time.perf_counter_ns)
end: float = field(default=None)

@property
def duration(self) -> float:
"""Returns the elapsed time in seconds or positive infinity if event was never finished"""
if self.end:
return (self.end - self.start) / 1e9
return float("+inf")

def __str__(self):
return f"{self.name} {datetime.timedelta(seconds=self.duration)}"

def finish(self):
self.end = time.perf_counter_ns()


@contextmanager
def create_timer(name):
"""
Creates a new EventTimer as a context manager to improve code readability.
"""
a_timer = EventTimer(name)
yield a_timer
55 changes: 55 additions & 0 deletions airbyte-cdk/python/unit_tests/test_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


from unittest import mock

from airbyte_cdk.utils.event_timing import create_timer


def test_counter_init():
with create_timer("Counter") as timer:
assert timer.name == "Counter"


def test_counter_start_event():
with create_timer("Counter") as timer:
with mock.patch("airbyte_cdk.utils.event_timing.EventTimer.start_event") as mock_start_event:
timer.start_event("test_event")
mock_start_event.assert_called_with("test_event")


def test_counter_finish_event():
with create_timer("Counter") as timer:
with mock.patch("airbyte_cdk.utils.event_timing.EventTimer.finish_event") as mock_finish_event:
timer.finish_event("test_event")
mock_finish_event.assert_called_with("test_event")


def test_timer_multiple_events():
with create_timer("Counter") as timer:
for i in range(10):
timer.start_event("test_event")
timer.finish_event()
assert timer.count == 10


def test_report_is_ordered_by_name_by_default():
names = ["j", "b", "g", "d", "e", "f", "c", "h", "i", "a"]

with create_timer("Source Counter") as timer:
for name in names:
timer.start_event(name)
timer.finish_event()
report = timer.report().split("\n")[1:] # ignore the first line
report_names = [line.split(" ")[0] for line in report]
assert report_names == sorted(names)


def test_double_finish_is_safely_ignored():
with create_timer("Source Counter") as timer:
timer.start_event("test_event")
timer.finish_event()
timer.finish_event()
assert timer.count == 1
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from airbyte_protocol import Type as MessageType
from base_python.cdk.streams.core import Stream
from base_python.cdk.utils.event_timing import create_timer
from base_python.integration import Source
from base_python.logger import AirbyteLogger

Expand Down Expand Up @@ -78,15 +79,24 @@ def read(
# TODO assert all streams exist in the connector
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
for configured_stream in catalog.streams:
try:
stream_instance = stream_instances[configured_stream.stream.name]
yield from self._read_stream(
logger=logger, stream_instance=stream_instance, configured_stream=configured_stream, connector_state=connector_state
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
raise e
with create_timer(self.name) as timer:
for configured_stream in catalog.streams:
try:
stream_instance = stream_instances[configured_stream.stream.name]
timer.start_event(configured_stream.stream.name)
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
connector_state=connector_state,
)
timer.end_event()
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
raise e
finally:
logger.info(f"Finished syncing {self.name}")
logger.info(timer.report())

logger.info(f"Finished syncing {self.name}")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import datetime
import time
from contextlib import contextmanager
from dataclasses import dataclass, field

from base_python.logger import AirbyteLogger

logger = AirbyteLogger()


class EventTimer:
"""Simple nanosecond resolution event timer for debugging, initially intended to be used to record streams execution
time for a source.
Event nesting follows a LIFO pattern, so finish will apply to the last started event.
"""

def __init__(self, name):
self.name = name
self.events = {}
self.count = 0
self.stack = []

def start_event(self, name):
"""
Start a new event and push it to the stack.
"""
self.events[name] = Event(name=name)
self.count += 1
self.stack.insert(0, self.events[name])

def finish_event(self):
"""
Finish the current event and pop it from the stack.
"""

if self.stack:
event = self.stack.pop(0)
event.finish()
else:
logger.warn(f"{self.name} finish_event called without start_event")

def report(self, order_by="name"):
"""
:param order_by: 'name' or 'duration'
"""
if order_by == "name":
events = sorted(self.events.values(), key=lambda event: event.name)
elif order_by == "duration":
events = sorted(self.events.values(), key=lambda event: event.duration)
text = f"{self.name} runtimes:\n"
text += "\n".join(str(event) for event in events)
return text


@dataclass
class Event:
name: str
start: float = field(default_factory=time.perf_counter_ns)
end: float = field(default=None)

@property
def duration(self) -> float:
"""Returns the elapsed time in seconds or positive infinity if event was never finished"""
if self.end:
return (self.end - self.start) / 1e9
return float("+inf")

def __str__(self):
return f"{self.name} {datetime.timedelta(seconds=self.duration)}"

def finish(self):
self.end = time.perf_counter_ns()


@contextmanager
def create_timer(name):
"""
Creates a new EventTimer as a context manager to improve code readability.
"""
a_timer = EventTimer(name)
yield a_timer
54 changes: 54 additions & 0 deletions airbyte-integrations/bases/base-python/unit_tests/test_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from unittest import mock

from base_python.cdk.utils.event_timing import create_timer


def test_counter_init():
with create_timer("Counter") as timer:
assert timer.name == "Counter"


def test_counter_start_event():
with create_timer("Counter") as timer:
with mock.patch("base_python.cdk.utils.event_timing.EventTimer.start_event") as mock_start_event:
timer.start_event("test_event")
mock_start_event.assert_called_with("test_event")


def test_counter_finish_event():
with create_timer("Counter") as timer:
with mock.patch("base_python.cdk.utils.event_timing.EventTimer.finish_event") as mock_finish_event:
timer.finish_event("test_event")
mock_finish_event.assert_called_with("test_event")


def test_timer_multiple_events():
with create_timer("Counter") as timer:
for i in range(10):
timer.start_event("test_event")
timer.finish_event()
assert timer.count == 10


def test_report_is_ordered_by_name_by_default():
names = ["j", "b", "g", "d", "e", "f", "c", "h", "i", "a"]

with create_timer("Source Counter") as timer:
for name in names:
timer.start_event(name)
timer.finish_event()
report = timer.report().split("\n")[1:] # ignore the first line
report_names = [line.split(" ")[0] for line in report]
assert report_names == sorted(names)


def test_double_finish_is_safely_ignored():
with create_timer("Source Counter") as timer:
timer.start_event("test_event")
timer.finish_event()
timer.finish_event()
assert timer.count == 1

0 comments on commit 65d471a

Please sign in to comment.