airbyte_cdk.utils.event_timing
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import datetime 6import logging 7import time 8from contextlib import contextmanager 9from dataclasses import dataclass, field 10from typing import Any, Generator, Literal, Optional 11 12logger = logging.getLogger("airbyte") 13 14 15class EventTimer: 16 """Simple nanosecond resolution event timer for debugging, initially intended to be used to record streams execution 17 time for a source. 18 Event nesting follows a LIFO pattern, so finish will apply to the last started event. 19 """ 20 21 def __init__(self, name: str) -> None: 22 self.name = name 23 self.events: dict[str, Any] = {} 24 self.count = 0 25 self.stack: list[Any] = [] 26 27 def start_event(self, name: str) -> None: 28 """ 29 Start a new event and push it to the stack. 30 """ 31 self.events[name] = Event(name=name) 32 self.count += 1 33 self.stack.insert(0, self.events[name]) 34 35 def finish_event(self) -> None: 36 """ 37 Finish the current event and pop it from the stack. 38 """ 39 40 if self.stack: 41 event = self.stack.pop(0) 42 event.finish() 43 else: 44 logger.warning(f"{self.name} finish_event called without start_event") 45 46 def report(self, order_by: Literal["name", "duration"] = "name") -> str: 47 """ 48 :param order_by: 'name' or 'duration' 49 """ 50 if order_by == "name": 51 events = sorted(self.events.values(), key=lambda event: event.name) 52 elif order_by == "duration": 53 events = sorted(self.events.values(), key=lambda event: event.duration) 54 text = f"{self.name} runtimes:\n" 55 text += "\n".join(str(event) for event in events) 56 return text 57 58 59@dataclass 60class Event: 61 name: str 62 start: float = field(default_factory=time.perf_counter_ns) 63 end: Optional[float] = field(default=None) 64 65 @property 66 def duration(self) -> float: 67 """Returns the elapsed time in seconds or positive infinity if event was never finished""" 68 if self.end: 69 return (self.end - self.start) / 1e9 70 return float("+inf") 71 72 def __str__(self) -> str: 73 return f"{self.name} {datetime.timedelta(seconds=self.duration)}" 74 75 def finish(self) -> None: 76 self.end = time.perf_counter_ns() 77 78 79@contextmanager 80def create_timer(name: str) -> Generator[EventTimer, Any, None]: 81 """ 82 Creates a new EventTimer as a context manager to improve code readability. 83 """ 84 a_timer = EventTimer(name) 85 yield a_timer
logger =
<Logger airbyte (INFO)>
class
EventTimer:
16class EventTimer: 17 """Simple nanosecond resolution event timer for debugging, initially intended to be used to record streams execution 18 time for a source. 19 Event nesting follows a LIFO pattern, so finish will apply to the last started event. 20 """ 21 22 def __init__(self, name: str) -> None: 23 self.name = name 24 self.events: dict[str, Any] = {} 25 self.count = 0 26 self.stack: list[Any] = [] 27 28 def start_event(self, name: str) -> None: 29 """ 30 Start a new event and push it to the stack. 31 """ 32 self.events[name] = Event(name=name) 33 self.count += 1 34 self.stack.insert(0, self.events[name]) 35 36 def finish_event(self) -> None: 37 """ 38 Finish the current event and pop it from the stack. 39 """ 40 41 if self.stack: 42 event = self.stack.pop(0) 43 event.finish() 44 else: 45 logger.warning(f"{self.name} finish_event called without start_event") 46 47 def report(self, order_by: Literal["name", "duration"] = "name") -> str: 48 """ 49 :param order_by: 'name' or 'duration' 50 """ 51 if order_by == "name": 52 events = sorted(self.events.values(), key=lambda event: event.name) 53 elif order_by == "duration": 54 events = sorted(self.events.values(), key=lambda event: event.duration) 55 text = f"{self.name} runtimes:\n" 56 text += "\n".join(str(event) for event in events) 57 return text
Simple nanosecond resolution event timer for debugging, initially intended to be used to record streams execution time for a source. Event nesting follows a LIFO pattern, so finish will apply to the last started event.
def
start_event(self, name: str) -> None:
28 def start_event(self, name: str) -> None: 29 """ 30 Start a new event and push it to the stack. 31 """ 32 self.events[name] = Event(name=name) 33 self.count += 1 34 self.stack.insert(0, self.events[name])
Start a new event and push it to the stack.
def
finish_event(self) -> None:
36 def finish_event(self) -> None: 37 """ 38 Finish the current event and pop it from the stack. 39 """ 40 41 if self.stack: 42 event = self.stack.pop(0) 43 event.finish() 44 else: 45 logger.warning(f"{self.name} finish_event called without start_event")
Finish the current event and pop it from the stack.
def
report(self, order_by: Literal['name', 'duration'] = 'name') -> str:
47 def report(self, order_by: Literal["name", "duration"] = "name") -> str: 48 """ 49 :param order_by: 'name' or 'duration' 50 """ 51 if order_by == "name": 52 events = sorted(self.events.values(), key=lambda event: event.name) 53 elif order_by == "duration": 54 events = sorted(self.events.values(), key=lambda event: event.duration) 55 text = f"{self.name} runtimes:\n" 56 text += "\n".join(str(event) for event in events) 57 return text
Parameters
- order_by: 'name' or 'duration'
@dataclass
class
Event:
60@dataclass 61class Event: 62 name: str 63 start: float = field(default_factory=time.perf_counter_ns) 64 end: Optional[float] = field(default=None) 65 66 @property 67 def duration(self) -> float: 68 """Returns the elapsed time in seconds or positive infinity if event was never finished""" 69 if self.end: 70 return (self.end - self.start) / 1e9 71 return float("+inf") 72 73 def __str__(self) -> str: 74 return f"{self.name} {datetime.timedelta(seconds=self.duration)}" 75 76 def finish(self) -> None: 77 self.end = time.perf_counter_ns()
duration: float
66 @property 67 def duration(self) -> float: 68 """Returns the elapsed time in seconds or positive infinity if event was never finished""" 69 if self.end: 70 return (self.end - self.start) / 1e9 71 return float("+inf")
Returns the elapsed time in seconds or positive infinity if event was never finished
80@contextmanager 81def create_timer(name: str) -> Generator[EventTimer, Any, None]: 82 """ 83 Creates a new EventTimer as a context manager to improve code readability. 84 """ 85 a_timer = EventTimer(name) 86 yield a_timer
Creates a new EventTimer as a context manager to improve code readability.