airbyte_cdk.sources.streams.concurrent.helpers

 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2
 3from typing import List, Optional, Union
 4
 5from airbyte_cdk.sources.streams import Stream
 6
 7
 8def get_primary_key_from_stream(
 9    stream_primary_key: Optional[Union[str, List[str], List[List[str]]]],
10) -> List[str]:
11    if stream_primary_key is None:
12        return []
13    elif isinstance(stream_primary_key, str):
14        return [stream_primary_key]
15    elif isinstance(stream_primary_key, list):
16        are_all_elements_str = all(isinstance(k, str) for k in stream_primary_key)
17        are_all_elements_list_of_size_one = all(
18            isinstance(k, list) and len(k) == 1 for k in stream_primary_key
19        )
20
21        if are_all_elements_str:
22            return stream_primary_key  # type: ignore # We verified all items in the list are strings
23        elif are_all_elements_list_of_size_one:
24            return list(map(lambda x: x[0], stream_primary_key))
25        else:
26            raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}")
27    else:
28        raise ValueError(f"Invalid type for primary key: {stream_primary_key}")
29
30
31def get_cursor_field_from_stream(stream: Stream) -> Optional[str]:
32    if isinstance(stream.cursor_field, list):
33        if len(stream.cursor_field) > 1:
34            raise ValueError(
35                f"Nested cursor fields are not supported. Got {stream.cursor_field} for {stream.name}"
36            )
37        elif len(stream.cursor_field) == 0:
38            return None
39        else:
40            return stream.cursor_field[0]
41    else:
42        return stream.cursor_field
def get_primary_key_from_stream( stream_primary_key: Union[str, List[str], List[List[str]], NoneType]) -> List[str]:
 9def get_primary_key_from_stream(
10    stream_primary_key: Optional[Union[str, List[str], List[List[str]]]],
11) -> List[str]:
12    if stream_primary_key is None:
13        return []
14    elif isinstance(stream_primary_key, str):
15        return [stream_primary_key]
16    elif isinstance(stream_primary_key, list):
17        are_all_elements_str = all(isinstance(k, str) for k in stream_primary_key)
18        are_all_elements_list_of_size_one = all(
19            isinstance(k, list) and len(k) == 1 for k in stream_primary_key
20        )
21
22        if are_all_elements_str:
23            return stream_primary_key  # type: ignore # We verified all items in the list are strings
24        elif are_all_elements_list_of_size_one:
25            return list(map(lambda x: x[0], stream_primary_key))
26        else:
27            raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}")
28    else:
29        raise ValueError(f"Invalid type for primary key: {stream_primary_key}")
def get_cursor_field_from_stream(stream: airbyte_cdk.Stream) -> Optional[str]:
32def get_cursor_field_from_stream(stream: Stream) -> Optional[str]:
33    if isinstance(stream.cursor_field, list):
34        if len(stream.cursor_field) > 1:
35            raise ValueError(
36                f"Nested cursor fields are not supported. Got {stream.cursor_field} for {stream.name}"
37            )
38        elif len(stream.cursor_field) == 0:
39            return None
40        else:
41            return stream.cursor_field[0]
42    else:
43        return stream.cursor_field