airbyte_cdk.destinations.vector_db_based.utils

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import itertools
 6import traceback
 7from typing import Any, Iterable, Iterator, Tuple, Union
 8
 9from airbyte_cdk.models import AirbyteRecordMessage, AirbyteStream
10
11
12def format_exception(exception: Exception) -> str:
13    return (
14        str(exception)
15        + "\n"
16        + "".join(traceback.TracebackException.from_exception(exception).format())
17    )
18
19
20def create_chunks(iterable: Iterable[Any], batch_size: int) -> Iterator[Tuple[Any, ...]]:
21    """A helper function to break an iterable into chunks of size batch_size."""
22    it = iter(iterable)
23    chunk = tuple(itertools.islice(it, batch_size))
24    while chunk:
25        yield chunk
26        chunk = tuple(itertools.islice(it, batch_size))
27
28
29def create_stream_identifier(stream: Union[AirbyteStream, AirbyteRecordMessage]) -> str:
30    if isinstance(stream, AirbyteStream):
31        return str(stream.name if stream.namespace is None else f"{stream.namespace}_{stream.name}")
32    else:
33        return str(
34            stream.stream if stream.namespace is None else f"{stream.namespace}_{stream.stream}"
35        )
def format_exception(exception: Exception) -> str:
13def format_exception(exception: Exception) -> str:
14    return (
15        str(exception)
16        + "\n"
17        + "".join(traceback.TracebackException.from_exception(exception).format())
18    )
def create_chunks(iterable: Iterable[Any], batch_size: int) -> Iterator[Tuple[Any, ...]]:
21def create_chunks(iterable: Iterable[Any], batch_size: int) -> Iterator[Tuple[Any, ...]]:
22    """A helper function to break an iterable into chunks of size batch_size."""
23    it = iter(iterable)
24    chunk = tuple(itertools.islice(it, batch_size))
25    while chunk:
26        yield chunk
27        chunk = tuple(itertools.islice(it, batch_size))

A helper function to break an iterable into chunks of size batch_size.

def create_stream_identifier( stream: Union[airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream, airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage]) -> str:
30def create_stream_identifier(stream: Union[AirbyteStream, AirbyteRecordMessage]) -> str:
31    if isinstance(stream, AirbyteStream):
32        return str(stream.name if stream.namespace is None else f"{stream.namespace}_{stream.name}")
33    else:
34        return str(
35            stream.stream if stream.namespace is None else f"{stream.namespace}_{stream.stream}"
36        )