airbyte_cdk.destinations.vector_db_based.utils
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import itertools 6import traceback 7from typing import Any, Iterable, Iterator, Tuple, Union 8 9from airbyte_cdk.models import AirbyteRecordMessage, AirbyteStream 10 11 12def format_exception(exception: Exception) -> str: 13 return ( 14 str(exception) 15 + "\n" 16 + "".join(traceback.TracebackException.from_exception(exception).format()) 17 ) 18 19 20def create_chunks(iterable: Iterable[Any], batch_size: int) -> Iterator[Tuple[Any, ...]]: 21 """A helper function to break an iterable into chunks of size batch_size.""" 22 it = iter(iterable) 23 chunk = tuple(itertools.islice(it, batch_size)) 24 while chunk: 25 yield chunk 26 chunk = tuple(itertools.islice(it, batch_size)) 27 28 29def create_stream_identifier(stream: Union[AirbyteStream, AirbyteRecordMessage]) -> str: 30 if isinstance(stream, AirbyteStream): 31 return str(stream.name if stream.namespace is None else f"{stream.namespace}_{stream.name}") 32 else: 33 return str( 34 stream.stream if stream.namespace is None else f"{stream.namespace}_{stream.stream}" 35 )
def
format_exception(exception: Exception) -> str:
def
create_chunks(iterable: Iterable[Any], batch_size: int) -> Iterator[Tuple[Any, ...]]:
21def create_chunks(iterable: Iterable[Any], batch_size: int) -> Iterator[Tuple[Any, ...]]: 22 """A helper function to break an iterable into chunks of size batch_size.""" 23 it = iter(iterable) 24 chunk = tuple(itertools.islice(it, batch_size)) 25 while chunk: 26 yield chunk 27 chunk = tuple(itertools.islice(it, batch_size))
A helper function to break an iterable into chunks of size batch_size.
def
create_stream_identifier( stream: Union[airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream, airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessage]) -> str:
30def create_stream_identifier(stream: Union[AirbyteStream, AirbyteRecordMessage]) -> str: 31 if isinstance(stream, AirbyteStream): 32 return str(stream.name if stream.namespace is None else f"{stream.namespace}_{stream.name}") 33 else: 34 return str( 35 stream.stream if stream.namespace is None else f"{stream.namespace}_{stream.stream}" 36 )