airbyte_cdk.sources.streams.concurrent.default_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from logging import Logger
  6from typing import Any, Callable, Iterable, List, Mapping, Optional, Union
  7
  8from airbyte_cdk.models import AirbyteStream, SyncMode
  9from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 10from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
 11from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
 12from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 13from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 14from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 15
 16
 17class DefaultStream(AbstractStream):
 18    def __init__(
 19        self,
 20        partition_generator: PartitionGenerator,
 21        name: str,
 22        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
 23        primary_key: List[str],
 24        cursor_field: Optional[str],
 25        logger: Logger,
 26        cursor: Cursor,
 27        namespace: Optional[str] = None,
 28        supports_file_transfer: bool = False,
 29    ) -> None:
 30        self._stream_partition_generator = partition_generator
 31        self._name = name
 32        self._json_schema = json_schema
 33        self._primary_key = primary_key
 34        self._cursor_field = cursor_field
 35        self._logger = logger
 36        self._cursor = cursor
 37        self._namespace = namespace
 38        self._supports_file_transfer = supports_file_transfer
 39
 40    def generate_partitions(self) -> Iterable[Partition]:
 41        yield from self._stream_partition_generator.generate()
 42
 43    @property
 44    def name(self) -> str:
 45        return self._name
 46
 47    @property
 48    def namespace(self) -> Optional[str]:
 49        return self._namespace
 50
 51    @property
 52    def cursor_field(self) -> Optional[str]:
 53        return self._cursor_field
 54
 55    def get_json_schema(self) -> Mapping[str, Any]:
 56        return self._json_schema() if callable(self._json_schema) else self._json_schema
 57
 58    def as_airbyte_stream(self) -> AirbyteStream:
 59        stream = AirbyteStream(
 60            name=self.name,
 61            json_schema=dict(self.get_json_schema()),
 62            supported_sync_modes=[SyncMode.full_refresh],
 63            is_resumable=False,
 64            is_file_based=self._supports_file_transfer,
 65        )
 66
 67        if self._namespace:
 68            stream.namespace = self._namespace
 69
 70        if self._cursor_field:
 71            stream.source_defined_cursor = True
 72            stream.is_resumable = True
 73            stream.supported_sync_modes.append(SyncMode.incremental)
 74            stream.default_cursor_field = [self._cursor_field]
 75
 76        keys = self._primary_key
 77        if keys and len(keys) > 0:
 78            stream.source_defined_primary_key = [[key] for key in keys]
 79
 80        return stream
 81
 82    def log_stream_sync_configuration(self) -> None:
 83        self._logger.debug(
 84            f"Syncing stream instance: {self.name}",
 85            extra={
 86                "primary_key": self._primary_key,
 87                "cursor_field": self.cursor_field,
 88            },
 89        )
 90
 91    @property
 92    def cursor(self) -> Cursor:
 93        return self._cursor
 94
 95    def check_availability(self) -> StreamAvailability:
 96        """
 97        Check stream availability by attempting to read the first record of the stream.
 98        """
 99        try:
100            partition = next(iter(self.generate_partitions()))
101        except StopIteration:
102            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
103            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
104            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
105            # without accounting for the case in which the parent stream is empty.
106            return StreamAvailability.unavailable(
107                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
108            )
109        except AirbyteTracedException as error:
110            return StreamAvailability.unavailable(
111                error.message or error.internal_message or "<no error message>"
112            )
113
114        try:
115            next(iter(partition.read()))
116            return StreamAvailability.available()
117        except StopIteration:
118            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
119            return StreamAvailability.available()
120        except AirbyteTracedException as error:
121            return StreamAvailability.unavailable(
122                error.message or error.internal_message or "<no error message>"
123            )
 18class DefaultStream(AbstractStream):
 19    def __init__(
 20        self,
 21        partition_generator: PartitionGenerator,
 22        name: str,
 23        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
 24        primary_key: List[str],
 25        cursor_field: Optional[str],
 26        logger: Logger,
 27        cursor: Cursor,
 28        namespace: Optional[str] = None,
 29        supports_file_transfer: bool = False,
 30    ) -> None:
 31        self._stream_partition_generator = partition_generator
 32        self._name = name
 33        self._json_schema = json_schema
 34        self._primary_key = primary_key
 35        self._cursor_field = cursor_field
 36        self._logger = logger
 37        self._cursor = cursor
 38        self._namespace = namespace
 39        self._supports_file_transfer = supports_file_transfer
 40
 41    def generate_partitions(self) -> Iterable[Partition]:
 42        yield from self._stream_partition_generator.generate()
 43
 44    @property
 45    def name(self) -> str:
 46        return self._name
 47
 48    @property
 49    def namespace(self) -> Optional[str]:
 50        return self._namespace
 51
 52    @property
 53    def cursor_field(self) -> Optional[str]:
 54        return self._cursor_field
 55
 56    def get_json_schema(self) -> Mapping[str, Any]:
 57        return self._json_schema() if callable(self._json_schema) else self._json_schema
 58
 59    def as_airbyte_stream(self) -> AirbyteStream:
 60        stream = AirbyteStream(
 61            name=self.name,
 62            json_schema=dict(self.get_json_schema()),
 63            supported_sync_modes=[SyncMode.full_refresh],
 64            is_resumable=False,
 65            is_file_based=self._supports_file_transfer,
 66        )
 67
 68        if self._namespace:
 69            stream.namespace = self._namespace
 70
 71        if self._cursor_field:
 72            stream.source_defined_cursor = True
 73            stream.is_resumable = True
 74            stream.supported_sync_modes.append(SyncMode.incremental)
 75            stream.default_cursor_field = [self._cursor_field]
 76
 77        keys = self._primary_key
 78        if keys and len(keys) > 0:
 79            stream.source_defined_primary_key = [[key] for key in keys]
 80
 81        return stream
 82
 83    def log_stream_sync_configuration(self) -> None:
 84        self._logger.debug(
 85            f"Syncing stream instance: {self.name}",
 86            extra={
 87                "primary_key": self._primary_key,
 88                "cursor_field": self.cursor_field,
 89            },
 90        )
 91
 92    @property
 93    def cursor(self) -> Cursor:
 94        return self._cursor
 95
 96    def check_availability(self) -> StreamAvailability:
 97        """
 98        Check stream availability by attempting to read the first record of the stream.
 99        """
100        try:
101            partition = next(iter(self.generate_partitions()))
102        except StopIteration:
103            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
104            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
105            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
106            # without accounting for the case in which the parent stream is empty.
107            return StreamAvailability.unavailable(
108                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
109            )
110        except AirbyteTracedException as error:
111            return StreamAvailability.unavailable(
112                error.message or error.internal_message or "<no error message>"
113            )
114
115        try:
116            next(iter(partition.read()))
117            return StreamAvailability.available()
118        except StopIteration:
119            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
120            return StreamAvailability.available()
121        except AirbyteTracedException as error:
122            return StreamAvailability.unavailable(
123                error.message or error.internal_message or "<no error message>"
124            )

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
DefaultStream( partition_generator: airbyte_cdk.sources.streams.concurrent.partitions.partition_generator.PartitionGenerator, name: str, json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], primary_key: List[str], cursor_field: Optional[str], logger: logging.Logger, cursor: airbyte_cdk.Cursor, namespace: Optional[str] = None, supports_file_transfer: bool = False)
19    def __init__(
20        self,
21        partition_generator: PartitionGenerator,
22        name: str,
23        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
24        primary_key: List[str],
25        cursor_field: Optional[str],
26        logger: Logger,
27        cursor: Cursor,
28        namespace: Optional[str] = None,
29        supports_file_transfer: bool = False,
30    ) -> None:
31        self._stream_partition_generator = partition_generator
32        self._name = name
33        self._json_schema = json_schema
34        self._primary_key = primary_key
35        self._cursor_field = cursor_field
36        self._logger = logger
37        self._cursor = cursor
38        self._namespace = namespace
39        self._supports_file_transfer = supports_file_transfer
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
41    def generate_partitions(self) -> Iterable[Partition]:
42        yield from self._stream_partition_generator.generate()

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
44    @property
45    def name(self) -> str:
46        return self._name
Returns

The stream name

namespace: Optional[str]
48    @property
49    def namespace(self) -> Optional[str]:
50        return self._namespace
cursor_field: Optional[str]
52    @property
53    def cursor_field(self) -> Optional[str]:
54        return self._cursor_field

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

def get_json_schema(self) -> Mapping[str, Any]:
56    def get_json_schema(self) -> Mapping[str, Any]:
57        return self._json_schema() if callable(self._json_schema) else self._json_schema
Returns

A dict of the JSON schema representing this stream.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
59    def as_airbyte_stream(self) -> AirbyteStream:
60        stream = AirbyteStream(
61            name=self.name,
62            json_schema=dict(self.get_json_schema()),
63            supported_sync_modes=[SyncMode.full_refresh],
64            is_resumable=False,
65            is_file_based=self._supports_file_transfer,
66        )
67
68        if self._namespace:
69            stream.namespace = self._namespace
70
71        if self._cursor_field:
72            stream.source_defined_cursor = True
73            stream.is_resumable = True
74            stream.supported_sync_modes.append(SyncMode.incremental)
75            stream.default_cursor_field = [self._cursor_field]
76
77        keys = self._primary_key
78        if keys and len(keys) > 0:
79            stream.source_defined_primary_key = [[key] for key in keys]
80
81        return stream
Returns

A dict of the JSON schema representing this stream.

def log_stream_sync_configuration(self) -> None:
83    def log_stream_sync_configuration(self) -> None:
84        self._logger.debug(
85            f"Syncing stream instance: {self.name}",
86            extra={
87                "primary_key": self._primary_key,
88                "cursor_field": self.cursor_field,
89            },
90        )

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
92    @property
93    def cursor(self) -> Cursor:
94        return self._cursor
Returns

The cursor associated with this stream.

 96    def check_availability(self) -> StreamAvailability:
 97        """
 98        Check stream availability by attempting to read the first record of the stream.
 99        """
100        try:
101            partition = next(iter(self.generate_partitions()))
102        except StopIteration:
103            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
104            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
105            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
106            # without accounting for the case in which the parent stream is empty.
107            return StreamAvailability.unavailable(
108                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
109            )
110        except AirbyteTracedException as error:
111            return StreamAvailability.unavailable(
112                error.message or error.internal_message or "<no error message>"
113            )
114
115        try:
116            next(iter(partition.read()))
117            return StreamAvailability.available()
118        except StopIteration:
119            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
120            return StreamAvailability.available()
121        except AirbyteTracedException as error:
122            return StreamAvailability.unavailable(
123                error.message or error.internal_message or "<no error message>"
124            )

Check stream availability by attempting to read the first record of the stream.