airbyte_cdk.sources.streams.concurrent.default_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from logging import Logger
  6from typing import Any, Callable, Iterable, List, Mapping, Optional, Union
  7
  8from airbyte_cdk.models import AirbyteStream, SyncMode
  9from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 10from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
 11from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
 12from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 13from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 14from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 15
 16
 17class DefaultStream(AbstractStream):
 18    def __init__(
 19        self,
 20        partition_generator: PartitionGenerator,
 21        name: str,
 22        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
 23        primary_key: List[str],
 24        cursor_field: Optional[CursorField],
 25        logger: Logger,
 26        cursor: Cursor,
 27        namespace: Optional[str] = None,
 28        supports_file_transfer: bool = False,
 29    ) -> None:
 30        self._stream_partition_generator = partition_generator
 31        self._name = name
 32        self._json_schema = json_schema
 33        self._primary_key = primary_key
 34        self._cursor_field = cursor_field
 35        self._logger = logger
 36        self._cursor = cursor
 37        self._namespace = namespace
 38        self._supports_file_transfer = supports_file_transfer
 39
 40    def generate_partitions(self) -> Iterable[Partition]:
 41        yield from self._stream_partition_generator.generate()
 42
 43    @property
 44    def name(self) -> str:
 45        return self._name
 46
 47    @property
 48    def namespace(self) -> Optional[str]:
 49        return self._namespace
 50
 51    @property
 52    def cursor_field(self) -> Optional[str]:
 53        return self._cursor_field.cursor_field_key if self._cursor_field else None
 54
 55    def get_json_schema(self) -> Mapping[str, Any]:
 56        return self._json_schema() if callable(self._json_schema) else self._json_schema
 57
 58    def as_airbyte_stream(self) -> AirbyteStream:
 59        stream = AirbyteStream(
 60            name=self.name,
 61            json_schema=dict(self.get_json_schema()),
 62            supported_sync_modes=[SyncMode.full_refresh],
 63            is_resumable=False,
 64            is_file_based=self._supports_file_transfer,
 65        )
 66
 67        if self._namespace:
 68            stream.namespace = self._namespace
 69
 70        if self._cursor_field:
 71            stream.source_defined_cursor = (
 72                not self._cursor_field.supports_catalog_defined_cursor_field
 73            )
 74            stream.is_resumable = True
 75            stream.supported_sync_modes.append(SyncMode.incremental)
 76            stream.default_cursor_field = [self._cursor_field.cursor_field_key]
 77
 78        keys = self._primary_key
 79        if keys and len(keys) > 0:
 80            stream.source_defined_primary_key = [[key] for key in keys]
 81
 82        return stream
 83
 84    def log_stream_sync_configuration(self) -> None:
 85        self._logger.debug(
 86            f"Syncing stream instance: {self.name}",
 87            extra={
 88                "primary_key": self._primary_key,
 89                "cursor_field": self.cursor_field,
 90            },
 91        )
 92
 93    @property
 94    def cursor(self) -> Cursor:
 95        return self._cursor
 96
 97    def check_availability(self) -> StreamAvailability:
 98        """
 99        Check stream availability by attempting to read the first record of the stream.
100        """
101        try:
102            partition = next(iter(self.generate_partitions()))
103        except StopIteration:
104            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
105            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
106            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
107            # without accounting for the case in which the parent stream is empty.
108            return StreamAvailability.unavailable(
109                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
110            )
111        except AirbyteTracedException as error:
112            return StreamAvailability.unavailable(
113                error.message or error.internal_message or "<no error message>"
114            )
115
116        try:
117            next(iter(partition.read()))
118            return StreamAvailability.available()
119        except StopIteration:
120            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
121            return StreamAvailability.available()
122        except AirbyteTracedException as error:
123            return StreamAvailability.unavailable(
124                error.message or error.internal_message or "<no error message>"
125            )
 18class DefaultStream(AbstractStream):
 19    def __init__(
 20        self,
 21        partition_generator: PartitionGenerator,
 22        name: str,
 23        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
 24        primary_key: List[str],
 25        cursor_field: Optional[CursorField],
 26        logger: Logger,
 27        cursor: Cursor,
 28        namespace: Optional[str] = None,
 29        supports_file_transfer: bool = False,
 30    ) -> None:
 31        self._stream_partition_generator = partition_generator
 32        self._name = name
 33        self._json_schema = json_schema
 34        self._primary_key = primary_key
 35        self._cursor_field = cursor_field
 36        self._logger = logger
 37        self._cursor = cursor
 38        self._namespace = namespace
 39        self._supports_file_transfer = supports_file_transfer
 40
 41    def generate_partitions(self) -> Iterable[Partition]:
 42        yield from self._stream_partition_generator.generate()
 43
 44    @property
 45    def name(self) -> str:
 46        return self._name
 47
 48    @property
 49    def namespace(self) -> Optional[str]:
 50        return self._namespace
 51
 52    @property
 53    def cursor_field(self) -> Optional[str]:
 54        return self._cursor_field.cursor_field_key if self._cursor_field else None
 55
 56    def get_json_schema(self) -> Mapping[str, Any]:
 57        return self._json_schema() if callable(self._json_schema) else self._json_schema
 58
 59    def as_airbyte_stream(self) -> AirbyteStream:
 60        stream = AirbyteStream(
 61            name=self.name,
 62            json_schema=dict(self.get_json_schema()),
 63            supported_sync_modes=[SyncMode.full_refresh],
 64            is_resumable=False,
 65            is_file_based=self._supports_file_transfer,
 66        )
 67
 68        if self._namespace:
 69            stream.namespace = self._namespace
 70
 71        if self._cursor_field:
 72            stream.source_defined_cursor = (
 73                not self._cursor_field.supports_catalog_defined_cursor_field
 74            )
 75            stream.is_resumable = True
 76            stream.supported_sync_modes.append(SyncMode.incremental)
 77            stream.default_cursor_field = [self._cursor_field.cursor_field_key]
 78
 79        keys = self._primary_key
 80        if keys and len(keys) > 0:
 81            stream.source_defined_primary_key = [[key] for key in keys]
 82
 83        return stream
 84
 85    def log_stream_sync_configuration(self) -> None:
 86        self._logger.debug(
 87            f"Syncing stream instance: {self.name}",
 88            extra={
 89                "primary_key": self._primary_key,
 90                "cursor_field": self.cursor_field,
 91            },
 92        )
 93
 94    @property
 95    def cursor(self) -> Cursor:
 96        return self._cursor
 97
 98    def check_availability(self) -> StreamAvailability:
 99        """
100        Check stream availability by attempting to read the first record of the stream.
101        """
102        try:
103            partition = next(iter(self.generate_partitions()))
104        except StopIteration:
105            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
106            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
107            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
108            # without accounting for the case in which the parent stream is empty.
109            return StreamAvailability.unavailable(
110                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
111            )
112        except AirbyteTracedException as error:
113            return StreamAvailability.unavailable(
114                error.message or error.internal_message or "<no error message>"
115            )
116
117        try:
118            next(iter(partition.read()))
119            return StreamAvailability.available()
120        except StopIteration:
121            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
122            return StreamAvailability.available()
123        except AirbyteTracedException as error:
124            return StreamAvailability.unavailable(
125                error.message or error.internal_message or "<no error message>"
126            )

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
DefaultStream( partition_generator: airbyte_cdk.sources.streams.concurrent.partitions.partition_generator.PartitionGenerator, name: str, json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], primary_key: List[str], cursor_field: Optional[airbyte_cdk.CursorField], logger: logging.Logger, cursor: airbyte_cdk.Cursor, namespace: Optional[str] = None, supports_file_transfer: bool = False)
19    def __init__(
20        self,
21        partition_generator: PartitionGenerator,
22        name: str,
23        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
24        primary_key: List[str],
25        cursor_field: Optional[CursorField],
26        logger: Logger,
27        cursor: Cursor,
28        namespace: Optional[str] = None,
29        supports_file_transfer: bool = False,
30    ) -> None:
31        self._stream_partition_generator = partition_generator
32        self._name = name
33        self._json_schema = json_schema
34        self._primary_key = primary_key
35        self._cursor_field = cursor_field
36        self._logger = logger
37        self._cursor = cursor
38        self._namespace = namespace
39        self._supports_file_transfer = supports_file_transfer
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
41    def generate_partitions(self) -> Iterable[Partition]:
42        yield from self._stream_partition_generator.generate()

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
44    @property
45    def name(self) -> str:
46        return self._name
Returns

The stream name

namespace: Optional[str]
48    @property
49    def namespace(self) -> Optional[str]:
50        return self._namespace
cursor_field: Optional[str]
52    @property
53    def cursor_field(self) -> Optional[str]:
54        return self._cursor_field.cursor_field_key if self._cursor_field else None

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

def get_json_schema(self) -> Mapping[str, Any]:
56    def get_json_schema(self) -> Mapping[str, Any]:
57        return self._json_schema() if callable(self._json_schema) else self._json_schema
Returns

A dict of the JSON schema representing this stream.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
59    def as_airbyte_stream(self) -> AirbyteStream:
60        stream = AirbyteStream(
61            name=self.name,
62            json_schema=dict(self.get_json_schema()),
63            supported_sync_modes=[SyncMode.full_refresh],
64            is_resumable=False,
65            is_file_based=self._supports_file_transfer,
66        )
67
68        if self._namespace:
69            stream.namespace = self._namespace
70
71        if self._cursor_field:
72            stream.source_defined_cursor = (
73                not self._cursor_field.supports_catalog_defined_cursor_field
74            )
75            stream.is_resumable = True
76            stream.supported_sync_modes.append(SyncMode.incremental)
77            stream.default_cursor_field = [self._cursor_field.cursor_field_key]
78
79        keys = self._primary_key
80        if keys and len(keys) > 0:
81            stream.source_defined_primary_key = [[key] for key in keys]
82
83        return stream
Returns

A dict of the JSON schema representing this stream.

def log_stream_sync_configuration(self) -> None:
85    def log_stream_sync_configuration(self) -> None:
86        self._logger.debug(
87            f"Syncing stream instance: {self.name}",
88            extra={
89                "primary_key": self._primary_key,
90                "cursor_field": self.cursor_field,
91            },
92        )

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
94    @property
95    def cursor(self) -> Cursor:
96        return self._cursor
Returns

The cursor associated with this stream.

 98    def check_availability(self) -> StreamAvailability:
 99        """
100        Check stream availability by attempting to read the first record of the stream.
101        """
102        try:
103            partition = next(iter(self.generate_partitions()))
104        except StopIteration:
105            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
106            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
107            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
108            # without accounting for the case in which the parent stream is empty.
109            return StreamAvailability.unavailable(
110                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
111            )
112        except AirbyteTracedException as error:
113            return StreamAvailability.unavailable(
114                error.message or error.internal_message or "<no error message>"
115            )
116
117        try:
118            next(iter(partition.read()))
119            return StreamAvailability.available()
120        except StopIteration:
121            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
122            return StreamAvailability.available()
123        except AirbyteTracedException as error:
124            return StreamAvailability.unavailable(
125                error.message or error.internal_message or "<no error message>"
126            )

Check stream availability by attempting to read the first record of the stream.