airbyte_cdk.sources.streams.concurrent.default_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from logging import Logger
  6from typing import Any, Callable, Iterable, List, Mapping, Optional, Union
  7
  8from airbyte_cdk.models import AirbyteStream, SyncMode
  9from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
 10    ConcurrentPerPartitionCursor,
 11)
 12from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
 13from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
 14    StreamSlicerPartitionGenerator,
 15)
 16from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 17from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
 18from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
 19from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 20from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 21from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 22
 23
 24class DefaultStream(AbstractStream):
 25    def __init__(
 26        self,
 27        partition_generator: PartitionGenerator,
 28        name: str,
 29        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
 30        primary_key: List[str],
 31        cursor_field: Optional[CursorField],
 32        logger: Logger,
 33        cursor: Cursor,
 34        namespace: Optional[str] = None,
 35        supports_file_transfer: bool = False,
 36        block_simultaneous_read: str = "",
 37    ) -> None:
 38        self._stream_partition_generator = partition_generator
 39        self._name = name
 40        self._json_schema = json_schema
 41        self._primary_key = primary_key
 42        self._cursor_field = cursor_field
 43        self._logger = logger
 44        self._cursor = cursor
 45        self._namespace = namespace
 46        self._supports_file_transfer = supports_file_transfer
 47        self._block_simultaneous_read = block_simultaneous_read
 48
 49    def generate_partitions(self) -> Iterable[Partition]:
 50        yield from self._stream_partition_generator.generate()
 51
 52    @property
 53    def name(self) -> str:
 54        return self._name
 55
 56    @property
 57    def namespace(self) -> Optional[str]:
 58        return self._namespace
 59
 60    @property
 61    def cursor_field(self) -> Optional[str]:
 62        return self._cursor_field.cursor_field_key if self._cursor_field else None
 63
 64    def get_json_schema(self) -> Mapping[str, Any]:
 65        return self._json_schema() if callable(self._json_schema) else self._json_schema
 66
 67    def as_airbyte_stream(self) -> AirbyteStream:
 68        stream = AirbyteStream(
 69            name=self.name,
 70            json_schema=dict(self.get_json_schema()),
 71            supported_sync_modes=[SyncMode.full_refresh],
 72            is_resumable=False,
 73            is_file_based=self._supports_file_transfer,
 74        )
 75
 76        if self._namespace:
 77            stream.namespace = self._namespace
 78
 79        if self._cursor_field:
 80            stream.source_defined_cursor = (
 81                not self._cursor_field.supports_catalog_defined_cursor_field
 82            )
 83            stream.is_resumable = True
 84            stream.supported_sync_modes.append(SyncMode.incremental)
 85            stream.default_cursor_field = [self._cursor_field.cursor_field_key]
 86
 87        keys = self._primary_key
 88        if keys and len(keys) > 0:
 89            stream.source_defined_primary_key = [[key] for key in keys]
 90
 91        return stream
 92
 93    def log_stream_sync_configuration(self) -> None:
 94        self._logger.debug(
 95            f"Syncing stream instance: {self.name}",
 96            extra={
 97                "primary_key": self._primary_key,
 98                "cursor_field": self.cursor_field,
 99            },
100        )
101
102    @property
103    def cursor(self) -> Cursor:
104        return self._cursor
105
106    @property
107    def block_simultaneous_read(self) -> str:
108        """Returns the blocking group name for this stream, or empty string if no blocking"""
109        return self._block_simultaneous_read
110
111    @block_simultaneous_read.setter
112    def block_simultaneous_read(self, value: str) -> None:
113        self._block_simultaneous_read = value
114
115    def get_partition_router(self) -> PartitionRouter | None:
116        """Return the partition router for this stream, or None if not available."""
117        if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
118            return None
119        stream_slicer = self._stream_partition_generator._stream_slicer
120        if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
121            return None
122        return stream_slicer._partition_router
123
124    def check_availability(self) -> StreamAvailability:
125        """
126        Check stream availability by attempting to read the first record of the stream.
127        """
128        try:
129            partition = next(iter(self.generate_partitions()))
130        except StopIteration:
131            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
132            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
133            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
134            # without accounting for the case in which the parent stream is empty.
135            return StreamAvailability.unavailable(
136                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
137            )
138        except AirbyteTracedException as error:
139            return StreamAvailability.unavailable(
140                error.message or error.internal_message or "<no error message>"
141            )
142
143        try:
144            next(iter(partition.read()))
145            return StreamAvailability.available()
146        except StopIteration:
147            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
148            return StreamAvailability.available()
149        except AirbyteTracedException as error:
150            return StreamAvailability.unavailable(
151                error.message or error.internal_message or "<no error message>"
152            )
 25class DefaultStream(AbstractStream):
 26    def __init__(
 27        self,
 28        partition_generator: PartitionGenerator,
 29        name: str,
 30        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
 31        primary_key: List[str],
 32        cursor_field: Optional[CursorField],
 33        logger: Logger,
 34        cursor: Cursor,
 35        namespace: Optional[str] = None,
 36        supports_file_transfer: bool = False,
 37        block_simultaneous_read: str = "",
 38    ) -> None:
 39        self._stream_partition_generator = partition_generator
 40        self._name = name
 41        self._json_schema = json_schema
 42        self._primary_key = primary_key
 43        self._cursor_field = cursor_field
 44        self._logger = logger
 45        self._cursor = cursor
 46        self._namespace = namespace
 47        self._supports_file_transfer = supports_file_transfer
 48        self._block_simultaneous_read = block_simultaneous_read
 49
 50    def generate_partitions(self) -> Iterable[Partition]:
 51        yield from self._stream_partition_generator.generate()
 52
 53    @property
 54    def name(self) -> str:
 55        return self._name
 56
 57    @property
 58    def namespace(self) -> Optional[str]:
 59        return self._namespace
 60
 61    @property
 62    def cursor_field(self) -> Optional[str]:
 63        return self._cursor_field.cursor_field_key if self._cursor_field else None
 64
 65    def get_json_schema(self) -> Mapping[str, Any]:
 66        return self._json_schema() if callable(self._json_schema) else self._json_schema
 67
 68    def as_airbyte_stream(self) -> AirbyteStream:
 69        stream = AirbyteStream(
 70            name=self.name,
 71            json_schema=dict(self.get_json_schema()),
 72            supported_sync_modes=[SyncMode.full_refresh],
 73            is_resumable=False,
 74            is_file_based=self._supports_file_transfer,
 75        )
 76
 77        if self._namespace:
 78            stream.namespace = self._namespace
 79
 80        if self._cursor_field:
 81            stream.source_defined_cursor = (
 82                not self._cursor_field.supports_catalog_defined_cursor_field
 83            )
 84            stream.is_resumable = True
 85            stream.supported_sync_modes.append(SyncMode.incremental)
 86            stream.default_cursor_field = [self._cursor_field.cursor_field_key]
 87
 88        keys = self._primary_key
 89        if keys and len(keys) > 0:
 90            stream.source_defined_primary_key = [[key] for key in keys]
 91
 92        return stream
 93
 94    def log_stream_sync_configuration(self) -> None:
 95        self._logger.debug(
 96            f"Syncing stream instance: {self.name}",
 97            extra={
 98                "primary_key": self._primary_key,
 99                "cursor_field": self.cursor_field,
100            },
101        )
102
103    @property
104    def cursor(self) -> Cursor:
105        return self._cursor
106
107    @property
108    def block_simultaneous_read(self) -> str:
109        """Returns the blocking group name for this stream, or empty string if no blocking"""
110        return self._block_simultaneous_read
111
112    @block_simultaneous_read.setter
113    def block_simultaneous_read(self, value: str) -> None:
114        self._block_simultaneous_read = value
115
116    def get_partition_router(self) -> PartitionRouter | None:
117        """Return the partition router for this stream, or None if not available."""
118        if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
119            return None
120        stream_slicer = self._stream_partition_generator._stream_slicer
121        if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
122            return None
123        return stream_slicer._partition_router
124
125    def check_availability(self) -> StreamAvailability:
126        """
127        Check stream availability by attempting to read the first record of the stream.
128        """
129        try:
130            partition = next(iter(self.generate_partitions()))
131        except StopIteration:
132            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
133            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
134            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
135            # without accounting for the case in which the parent stream is empty.
136            return StreamAvailability.unavailable(
137                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
138            )
139        except AirbyteTracedException as error:
140            return StreamAvailability.unavailable(
141                error.message or error.internal_message or "<no error message>"
142            )
143
144        try:
145            next(iter(partition.read()))
146            return StreamAvailability.available()
147        except StopIteration:
148            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
149            return StreamAvailability.available()
150        except AirbyteTracedException as error:
151            return StreamAvailability.unavailable(
152                error.message or error.internal_message or "<no error message>"
153            )

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
DefaultStream( partition_generator: airbyte_cdk.sources.streams.concurrent.partitions.partition_generator.PartitionGenerator, name: str, json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], primary_key: List[str], cursor_field: Optional[airbyte_cdk.CursorField], logger: logging.Logger, cursor: airbyte_cdk.Cursor, namespace: Optional[str] = None, supports_file_transfer: bool = False, block_simultaneous_read: str = '')
26    def __init__(
27        self,
28        partition_generator: PartitionGenerator,
29        name: str,
30        json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
31        primary_key: List[str],
32        cursor_field: Optional[CursorField],
33        logger: Logger,
34        cursor: Cursor,
35        namespace: Optional[str] = None,
36        supports_file_transfer: bool = False,
37        block_simultaneous_read: str = "",
38    ) -> None:
39        self._stream_partition_generator = partition_generator
40        self._name = name
41        self._json_schema = json_schema
42        self._primary_key = primary_key
43        self._cursor_field = cursor_field
44        self._logger = logger
45        self._cursor = cursor
46        self._namespace = namespace
47        self._supports_file_transfer = supports_file_transfer
48        self._block_simultaneous_read = block_simultaneous_read
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
50    def generate_partitions(self) -> Iterable[Partition]:
51        yield from self._stream_partition_generator.generate()

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
53    @property
54    def name(self) -> str:
55        return self._name
Returns

The stream name

namespace: Optional[str]
57    @property
58    def namespace(self) -> Optional[str]:
59        return self._namespace
cursor_field: Optional[str]
61    @property
62    def cursor_field(self) -> Optional[str]:
63        return self._cursor_field.cursor_field_key if self._cursor_field else None

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

def get_json_schema(self) -> Mapping[str, Any]:
65    def get_json_schema(self) -> Mapping[str, Any]:
66        return self._json_schema() if callable(self._json_schema) else self._json_schema
Returns

A dict of the JSON schema representing this stream.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
68    def as_airbyte_stream(self) -> AirbyteStream:
69        stream = AirbyteStream(
70            name=self.name,
71            json_schema=dict(self.get_json_schema()),
72            supported_sync_modes=[SyncMode.full_refresh],
73            is_resumable=False,
74            is_file_based=self._supports_file_transfer,
75        )
76
77        if self._namespace:
78            stream.namespace = self._namespace
79
80        if self._cursor_field:
81            stream.source_defined_cursor = (
82                not self._cursor_field.supports_catalog_defined_cursor_field
83            )
84            stream.is_resumable = True
85            stream.supported_sync_modes.append(SyncMode.incremental)
86            stream.default_cursor_field = [self._cursor_field.cursor_field_key]
87
88        keys = self._primary_key
89        if keys and len(keys) > 0:
90            stream.source_defined_primary_key = [[key] for key in keys]
91
92        return stream
Returns

A dict of the JSON schema representing this stream.

def log_stream_sync_configuration(self) -> None:
 94    def log_stream_sync_configuration(self) -> None:
 95        self._logger.debug(
 96            f"Syncing stream instance: {self.name}",
 97            extra={
 98                "primary_key": self._primary_key,
 99                "cursor_field": self.cursor_field,
100            },
101        )

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
103    @property
104    def cursor(self) -> Cursor:
105        return self._cursor
Returns

The cursor associated with this stream.

block_simultaneous_read: str
107    @property
108    def block_simultaneous_read(self) -> str:
109        """Returns the blocking group name for this stream, or empty string if no blocking"""
110        return self._block_simultaneous_read

Returns the blocking group name for this stream, or empty string if no blocking

def get_partition_router( self) -> airbyte_cdk.sources.declarative.partition_routers.PartitionRouter | None:
116    def get_partition_router(self) -> PartitionRouter | None:
117        """Return the partition router for this stream, or None if not available."""
118        if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
119            return None
120        stream_slicer = self._stream_partition_generator._stream_slicer
121        if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
122            return None
123        return stream_slicer._partition_router

Return the partition router for this stream, or None if not available.

125    def check_availability(self) -> StreamAvailability:
126        """
127        Check stream availability by attempting to read the first record of the stream.
128        """
129        try:
130            partition = next(iter(self.generate_partitions()))
131        except StopIteration:
132            # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
133            # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
134            # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
135            # without accounting for the case in which the parent stream is empty.
136            return StreamAvailability.unavailable(
137                f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
138            )
139        except AirbyteTracedException as error:
140            return StreamAvailability.unavailable(
141                error.message or error.internal_message or "<no error message>"
142            )
143
144        try:
145            next(iter(partition.read()))
146            return StreamAvailability.available()
147        except StopIteration:
148            self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
149            return StreamAvailability.available()
150        except AirbyteTracedException as error:
151            return StreamAvailability.unavailable(
152                error.message or error.internal_message or "<no error message>"
153            )

Check stream availability by attempting to read the first record of the stream.