airbyte_cdk.sources.streams.concurrent.helpers
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2 3from typing import List, Optional, Union 4 5from airbyte_cdk.sources.streams import Stream 6 7 8def get_primary_key_from_stream( 9 stream_primary_key: Optional[Union[str, List[str], List[List[str]]]], 10) -> List[str]: 11 if stream_primary_key is None: 12 return [] 13 elif isinstance(stream_primary_key, str): 14 return [stream_primary_key] 15 elif isinstance(stream_primary_key, list): 16 are_all_elements_str = all(isinstance(k, str) for k in stream_primary_key) 17 are_all_elements_list_of_size_one = all( 18 isinstance(k, list) and len(k) == 1 for k in stream_primary_key 19 ) 20 21 if are_all_elements_str: 22 return stream_primary_key # type: ignore # We verified all items in the list are strings 23 elif are_all_elements_list_of_size_one: 24 return list(map(lambda x: x[0], stream_primary_key)) 25 else: 26 raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}") 27 else: 28 raise ValueError(f"Invalid type for primary key: {stream_primary_key}") 29 30 31def get_cursor_field_from_stream(stream: Stream) -> Optional[str]: 32 if isinstance(stream.cursor_field, list): 33 if len(stream.cursor_field) > 1: 34 raise ValueError( 35 f"Nested cursor fields are not supported. Got {stream.cursor_field} for {stream.name}" 36 ) 37 elif len(stream.cursor_field) == 0: 38 return None 39 else: 40 return stream.cursor_field[0] 41 else: 42 return stream.cursor_field
def
get_primary_key_from_stream( stream_primary_key: Union[str, List[str], List[List[str]], NoneType]) -> List[str]:
9def get_primary_key_from_stream( 10 stream_primary_key: Optional[Union[str, List[str], List[List[str]]]], 11) -> List[str]: 12 if stream_primary_key is None: 13 return [] 14 elif isinstance(stream_primary_key, str): 15 return [stream_primary_key] 16 elif isinstance(stream_primary_key, list): 17 are_all_elements_str = all(isinstance(k, str) for k in stream_primary_key) 18 are_all_elements_list_of_size_one = all( 19 isinstance(k, list) and len(k) == 1 for k in stream_primary_key 20 ) 21 22 if are_all_elements_str: 23 return stream_primary_key # type: ignore # We verified all items in the list are strings 24 elif are_all_elements_list_of_size_one: 25 return list(map(lambda x: x[0], stream_primary_key)) 26 else: 27 raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}") 28 else: 29 raise ValueError(f"Invalid type for primary key: {stream_primary_key}")
32def get_cursor_field_from_stream(stream: Stream) -> Optional[str]: 33 if isinstance(stream.cursor_field, list): 34 if len(stream.cursor_field) > 1: 35 raise ValueError( 36 f"Nested cursor fields are not supported. Got {stream.cursor_field} for {stream.name}" 37 ) 38 elif len(stream.cursor_field) == 0: 39 return None 40 else: 41 return stream.cursor_field[0] 42 else: 43 return stream.cursor_field