airbyte_cdk.utils.event_timing

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import datetime
 6import logging
 7import time
 8from contextlib import contextmanager
 9from dataclasses import dataclass, field
10from typing import Any, Generator, Literal, Optional
11
12logger = logging.getLogger("airbyte")
13
14
15class EventTimer:
16    """Simple nanosecond resolution event timer for debugging, initially intended to be used to record streams execution
17    time for a source.
18       Event nesting follows a LIFO pattern, so finish will apply to the last started event.
19    """
20
21    def __init__(self, name: str) -> None:
22        self.name = name
23        self.events: dict[str, Any] = {}
24        self.count = 0
25        self.stack: list[Any] = []
26
27    def start_event(self, name: str) -> None:
28        """
29        Start a new event and push it to the stack.
30        """
31        self.events[name] = Event(name=name)
32        self.count += 1
33        self.stack.insert(0, self.events[name])
34
35    def finish_event(self) -> None:
36        """
37        Finish the current event and pop it from the stack.
38        """
39
40        if self.stack:
41            event = self.stack.pop(0)
42            event.finish()
43        else:
44            logger.warning(f"{self.name} finish_event called without start_event")
45
46    def report(self, order_by: Literal["name", "duration"] = "name") -> str:
47        """
48        :param order_by: 'name' or 'duration'
49        """
50        if order_by == "name":
51            events = sorted(self.events.values(), key=lambda event: event.name)
52        elif order_by == "duration":
53            events = sorted(self.events.values(), key=lambda event: event.duration)
54        text = f"{self.name} runtimes:\n"
55        text += "\n".join(str(event) for event in events)
56        return text
57
58
59@dataclass
60class Event:
61    name: str
62    start: float = field(default_factory=time.perf_counter_ns)
63    end: Optional[float] = field(default=None)
64
65    @property
66    def duration(self) -> float:
67        """Returns the elapsed time in seconds or positive infinity if event was never finished"""
68        if self.end:
69            return (self.end - self.start) / 1e9
70        return float("+inf")
71
72    def __str__(self) -> str:
73        return f"{self.name} {datetime.timedelta(seconds=self.duration)}"
74
75    def finish(self) -> None:
76        self.end = time.perf_counter_ns()
77
78
79@contextmanager
80def create_timer(name: str) -> Generator[EventTimer, Any, None]:
81    """
82    Creates a new EventTimer as a context manager to improve code readability.
83    """
84    a_timer = EventTimer(name)
85    yield a_timer
logger = <Logger airbyte (INFO)>
class EventTimer:
16class EventTimer:
17    """Simple nanosecond resolution event timer for debugging, initially intended to be used to record streams execution
18    time for a source.
19       Event nesting follows a LIFO pattern, so finish will apply to the last started event.
20    """
21
22    def __init__(self, name: str) -> None:
23        self.name = name
24        self.events: dict[str, Any] = {}
25        self.count = 0
26        self.stack: list[Any] = []
27
28    def start_event(self, name: str) -> None:
29        """
30        Start a new event and push it to the stack.
31        """
32        self.events[name] = Event(name=name)
33        self.count += 1
34        self.stack.insert(0, self.events[name])
35
36    def finish_event(self) -> None:
37        """
38        Finish the current event and pop it from the stack.
39        """
40
41        if self.stack:
42            event = self.stack.pop(0)
43            event.finish()
44        else:
45            logger.warning(f"{self.name} finish_event called without start_event")
46
47    def report(self, order_by: Literal["name", "duration"] = "name") -> str:
48        """
49        :param order_by: 'name' or 'duration'
50        """
51        if order_by == "name":
52            events = sorted(self.events.values(), key=lambda event: event.name)
53        elif order_by == "duration":
54            events = sorted(self.events.values(), key=lambda event: event.duration)
55        text = f"{self.name} runtimes:\n"
56        text += "\n".join(str(event) for event in events)
57        return text

Simple nanosecond resolution event timer for debugging, initially intended to be used to record streams execution time for a source. Event nesting follows a LIFO pattern, so finish will apply to the last started event.

EventTimer(name: str)
22    def __init__(self, name: str) -> None:
23        self.name = name
24        self.events: dict[str, Any] = {}
25        self.count = 0
26        self.stack: list[Any] = []
name
events: dict[str, typing.Any]
count
stack: list[typing.Any]
def start_event(self, name: str) -> None:
28    def start_event(self, name: str) -> None:
29        """
30        Start a new event and push it to the stack.
31        """
32        self.events[name] = Event(name=name)
33        self.count += 1
34        self.stack.insert(0, self.events[name])

Start a new event and push it to the stack.

def finish_event(self) -> None:
36    def finish_event(self) -> None:
37        """
38        Finish the current event and pop it from the stack.
39        """
40
41        if self.stack:
42            event = self.stack.pop(0)
43            event.finish()
44        else:
45            logger.warning(f"{self.name} finish_event called without start_event")

Finish the current event and pop it from the stack.

def report(self, order_by: Literal['name', 'duration'] = 'name') -> str:
47    def report(self, order_by: Literal["name", "duration"] = "name") -> str:
48        """
49        :param order_by: 'name' or 'duration'
50        """
51        if order_by == "name":
52            events = sorted(self.events.values(), key=lambda event: event.name)
53        elif order_by == "duration":
54            events = sorted(self.events.values(), key=lambda event: event.duration)
55        text = f"{self.name} runtimes:\n"
56        text += "\n".join(str(event) for event in events)
57        return text
Parameters
  • order_by: 'name' or 'duration'
@dataclass
class Event:
60@dataclass
61class Event:
62    name: str
63    start: float = field(default_factory=time.perf_counter_ns)
64    end: Optional[float] = field(default=None)
65
66    @property
67    def duration(self) -> float:
68        """Returns the elapsed time in seconds or positive infinity if event was never finished"""
69        if self.end:
70            return (self.end - self.start) / 1e9
71        return float("+inf")
72
73    def __str__(self) -> str:
74        return f"{self.name} {datetime.timedelta(seconds=self.duration)}"
75
76    def finish(self) -> None:
77        self.end = time.perf_counter_ns()
Event(name: str, start: float = <factory>, end: Optional[float] = None)
name: str
start: float
end: Optional[float] = None
duration: float
66    @property
67    def duration(self) -> float:
68        """Returns the elapsed time in seconds or positive infinity if event was never finished"""
69        if self.end:
70            return (self.end - self.start) / 1e9
71        return float("+inf")

Returns the elapsed time in seconds or positive infinity if event was never finished

def finish(self) -> None:
76    def finish(self) -> None:
77        self.end = time.perf_counter_ns()
@contextmanager
def create_timer( name: str) -> Generator[EventTimer, Any, NoneType]:
80@contextmanager
81def create_timer(name: str) -> Generator[EventTimer, Any, None]:
82    """
83    Creates a new EventTimer as a context manager to improve code readability.
84    """
85    a_timer = EventTimer(name)
86    yield a_timer

Creates a new EventTimer as a context manager to improve code readability.