airbyte_cdk.sources.streams.concurrent.default_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from functools import lru_cache
  6from logging import Logger
  7from typing import Any, Iterable, List, Mapping, Optional
  8
  9from airbyte_cdk.models import AirbyteStream, SyncMode
 10from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 11from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
 12from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
 13from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 14from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 15from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 16
 17
 18class DefaultStream(AbstractStream):
 19    def __init__(
 20        self,
 21        partition_generator: PartitionGenerator,
 22        name: str,
 23        json_schema: Mapping[str, Any],
 24        primary_key: List[str],
 25        cursor_field: Optional[str],
 26        logger: Logger,
 27        cursor: Cursor,
 28        namespace: Optional[str] = None,
 29        supports_file_transfer: bool = False,
 30    ) -> None:
 31        self._stream_partition_generator = partition_generator
 32        self._name = name
 33        self._json_schema = json_schema
 34        self._primary_key = primary_key
 35        self._cursor_field = cursor_field
 36        self._logger = logger
 37        self._cursor = cursor
 38        self._namespace = namespace
 39        self._supports_file_transfer = supports_file_transfer
 40
 41    def generate_partitions(self) -> Iterable[Partition]:
 42        yield from self._stream_partition_generator.generate()
 43
 44    @property
 45    def name(self) -> str:
 46        return self._name
 47
 48    @property
 49    def namespace(self) -> Optional[str]:
 50        return self._namespace
 51
 52    @property
 53    def cursor_field(self) -> Optional[str]:
 54        return self._cursor_field
 55
 56    @lru_cache(maxsize=None)
 57    def get_json_schema(self) -> Mapping[str, Any]:
 58        return self._json_schema
 59
 60    def as_airbyte_stream(self) -> AirbyteStream:
 61        stream = AirbyteStream(
 62            name=self.name,
 63            json_schema=dict(self._json_schema),
 64            supported_sync_modes=[SyncMode.full_refresh],
 65            is_resumable=False,
 66            is_file_based=self._supports_file_transfer,
 67        )
 68
 69        if self._namespace:
 70            stream.namespace = self._namespace
 71
 72        if self._cursor_field:
 73            stream.source_defined_cursor = True
 74            stream.is_resumable = True
 75            stream.supported_sync_modes.append(SyncMode.incremental)
 76            stream.default_cursor_field = [self._cursor_field]
 77
 78        keys = self._primary_key
 79        if keys and len(keys) > 0:
 80            stream.source_defined_primary_key = [[key] for key in keys]
 81
 82        return stream
 83
 84    def log_stream_sync_configuration(self) -> None:
 85        self._logger.debug(
 86            f"Syncing stream instance: {self.name}",
 87            extra={
 88                "primary_key": self._primary_key,
 89                "cursor_field": self.cursor_field,
 90            },
 91        )
 92
 93    @property
 94    def cursor(self) -> Cursor:
 95        return self._cursor
 96
 97    def check_availability(self) -> StreamAvailability:
 98        """
 99        Check stream availability by attempting to read the first record of the stream.
100        """
101        try:
102            partition = next(iter(self.generate_partitions()))
103        except StopIteration:
104            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
105            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
106            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
107            # without accounting for the case in which the parent stream is empty.
108            return StreamAvailability.unavailable(
109                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
110            )
111        except AirbyteTracedException as error:
112            return StreamAvailability.unavailable(
113                error.message or error.internal_message or "<no error message>"
114            )
115
116        try:
117            next(iter(partition.read()))
118            return StreamAvailability.available()
119        except StopIteration:
120            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
121            return StreamAvailability.available()
122        except AirbyteTracedException as error:
123            return StreamAvailability.unavailable(
124                error.message or error.internal_message or "<no error message>"
125            )
 19class DefaultStream(AbstractStream):
 20    def __init__(
 21        self,
 22        partition_generator: PartitionGenerator,
 23        name: str,
 24        json_schema: Mapping[str, Any],
 25        primary_key: List[str],
 26        cursor_field: Optional[str],
 27        logger: Logger,
 28        cursor: Cursor,
 29        namespace: Optional[str] = None,
 30        supports_file_transfer: bool = False,
 31    ) -> None:
 32        self._stream_partition_generator = partition_generator
 33        self._name = name
 34        self._json_schema = json_schema
 35        self._primary_key = primary_key
 36        self._cursor_field = cursor_field
 37        self._logger = logger
 38        self._cursor = cursor
 39        self._namespace = namespace
 40        self._supports_file_transfer = supports_file_transfer
 41
 42    def generate_partitions(self) -> Iterable[Partition]:
 43        yield from self._stream_partition_generator.generate()
 44
 45    @property
 46    def name(self) -> str:
 47        return self._name
 48
 49    @property
 50    def namespace(self) -> Optional[str]:
 51        return self._namespace
 52
 53    @property
 54    def cursor_field(self) -> Optional[str]:
 55        return self._cursor_field
 56
 57    @lru_cache(maxsize=None)
 58    def get_json_schema(self) -> Mapping[str, Any]:
 59        return self._json_schema
 60
 61    def as_airbyte_stream(self) -> AirbyteStream:
 62        stream = AirbyteStream(
 63            name=self.name,
 64            json_schema=dict(self._json_schema),
 65            supported_sync_modes=[SyncMode.full_refresh],
 66            is_resumable=False,
 67            is_file_based=self._supports_file_transfer,
 68        )
 69
 70        if self._namespace:
 71            stream.namespace = self._namespace
 72
 73        if self._cursor_field:
 74            stream.source_defined_cursor = True
 75            stream.is_resumable = True
 76            stream.supported_sync_modes.append(SyncMode.incremental)
 77            stream.default_cursor_field = [self._cursor_field]
 78
 79        keys = self._primary_key
 80        if keys and len(keys) > 0:
 81            stream.source_defined_primary_key = [[key] for key in keys]
 82
 83        return stream
 84
 85    def log_stream_sync_configuration(self) -> None:
 86        self._logger.debug(
 87            f"Syncing stream instance: {self.name}",
 88            extra={
 89                "primary_key": self._primary_key,
 90                "cursor_field": self.cursor_field,
 91            },
 92        )
 93
 94    @property
 95    def cursor(self) -> Cursor:
 96        return self._cursor
 97
 98    def check_availability(self) -> StreamAvailability:
 99        """
100        Check stream availability by attempting to read the first record of the stream.
101        """
102        try:
103            partition = next(iter(self.generate_partitions()))
104        except StopIteration:
105            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
106            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
107            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
108            # without accounting for the case in which the parent stream is empty.
109            return StreamAvailability.unavailable(
110                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
111            )
112        except AirbyteTracedException as error:
113            return StreamAvailability.unavailable(
114                error.message or error.internal_message or "<no error message>"
115            )
116
117        try:
118            next(iter(partition.read()))
119            return StreamAvailability.available()
120        except StopIteration:
121            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
122            return StreamAvailability.available()
123        except AirbyteTracedException as error:
124            return StreamAvailability.unavailable(
125                error.message or error.internal_message or "<no error message>"
126            )

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
DefaultStream( partition_generator: airbyte_cdk.sources.streams.concurrent.partitions.partition_generator.PartitionGenerator, name: str, json_schema: Mapping[str, Any], primary_key: List[str], cursor_field: Optional[str], logger: logging.Logger, cursor: airbyte_cdk.Cursor, namespace: Optional[str] = None, supports_file_transfer: bool = False)
20    def __init__(
21        self,
22        partition_generator: PartitionGenerator,
23        name: str,
24        json_schema: Mapping[str, Any],
25        primary_key: List[str],
26        cursor_field: Optional[str],
27        logger: Logger,
28        cursor: Cursor,
29        namespace: Optional[str] = None,
30        supports_file_transfer: bool = False,
31    ) -> None:
32        self._stream_partition_generator = partition_generator
33        self._name = name
34        self._json_schema = json_schema
35        self._primary_key = primary_key
36        self._cursor_field = cursor_field
37        self._logger = logger
38        self._cursor = cursor
39        self._namespace = namespace
40        self._supports_file_transfer = supports_file_transfer
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
42    def generate_partitions(self) -> Iterable[Partition]:
43        yield from self._stream_partition_generator.generate()

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
45    @property
46    def name(self) -> str:
47        return self._name
Returns

The stream name

namespace: Optional[str]
49    @property
50    def namespace(self) -> Optional[str]:
51        return self._namespace
cursor_field: Optional[str]
53    @property
54    def cursor_field(self) -> Optional[str]:
55        return self._cursor_field

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
57    @lru_cache(maxsize=None)
58    def get_json_schema(self) -> Mapping[str, Any]:
59        return self._json_schema
Returns

A dict of the JSON schema representing this stream.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
61    def as_airbyte_stream(self) -> AirbyteStream:
62        stream = AirbyteStream(
63            name=self.name,
64            json_schema=dict(self._json_schema),
65            supported_sync_modes=[SyncMode.full_refresh],
66            is_resumable=False,
67            is_file_based=self._supports_file_transfer,
68        )
69
70        if self._namespace:
71            stream.namespace = self._namespace
72
73        if self._cursor_field:
74            stream.source_defined_cursor = True
75            stream.is_resumable = True
76            stream.supported_sync_modes.append(SyncMode.incremental)
77            stream.default_cursor_field = [self._cursor_field]
78
79        keys = self._primary_key
80        if keys and len(keys) > 0:
81            stream.source_defined_primary_key = [[key] for key in keys]
82
83        return stream
Returns

A dict of the JSON schema representing this stream.

def log_stream_sync_configuration(self) -> None:
85    def log_stream_sync_configuration(self) -> None:
86        self._logger.debug(
87            f"Syncing stream instance: {self.name}",
88            extra={
89                "primary_key": self._primary_key,
90                "cursor_field": self.cursor_field,
91            },
92        )

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
94    @property
95    def cursor(self) -> Cursor:
96        return self._cursor
Returns

The cursor associated with this stream.

 98    def check_availability(self) -> StreamAvailability:
 99        """
100        Check stream availability by attempting to read the first record of the stream.
101        """
102        try:
103            partition = next(iter(self.generate_partitions()))
104        except StopIteration:
105            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
106            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
107            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
108            # without accounting for the case in which the parent stream is empty.
109            return StreamAvailability.unavailable(
110                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
111            )
112        except AirbyteTracedException as error:
113            return StreamAvailability.unavailable(
114                error.message or error.internal_message or "<no error message>"
115            )
116
117        try:
118            next(iter(partition.read()))
119            return StreamAvailability.available()
120        except StopIteration:
121            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
122            return StreamAvailability.available()
123        except AirbyteTracedException as error:
124            return StreamAvailability.unavailable(
125                error.message or error.internal_message or "<no error message>"
126            )

Check stream availability by attempting to read the first record of the stream.