airbyte_cdk.sources.streams.concurrent.default_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from functools import lru_cache
  6from logging import Logger
  7from typing import Any, Iterable, List, Mapping, Optional
  8
  9from airbyte_cdk.models import AirbyteStream, SyncMode
 10from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 11from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
 12    AbstractAvailabilityStrategy,
 13    StreamAvailability,
 14)
 15from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
 16from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 17from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 18
 19
 20class DefaultStream(AbstractStream):
 21    def __init__(
 22        self,
 23        partition_generator: PartitionGenerator,
 24        name: str,
 25        json_schema: Mapping[str, Any],
 26        availability_strategy: AbstractAvailabilityStrategy,
 27        primary_key: List[str],
 28        cursor_field: Optional[str],
 29        logger: Logger,
 30        cursor: Cursor,
 31        namespace: Optional[str] = None,
 32        supports_file_transfer: bool = False,
 33    ) -> None:
 34        self._stream_partition_generator = partition_generator
 35        self._name = name
 36        self._json_schema = json_schema
 37        self._availability_strategy = availability_strategy
 38        self._primary_key = primary_key
 39        self._cursor_field = cursor_field
 40        self._logger = logger
 41        self._cursor = cursor
 42        self._namespace = namespace
 43        self._supports_file_transfer = supports_file_transfer
 44
 45    def generate_partitions(self) -> Iterable[Partition]:
 46        yield from self._stream_partition_generator.generate()
 47
 48    @property
 49    def name(self) -> str:
 50        return self._name
 51
 52    @property
 53    def namespace(self) -> Optional[str]:
 54        return self._namespace
 55
 56    def check_availability(self) -> StreamAvailability:
 57        return self._availability_strategy.check_availability(self._logger)
 58
 59    @property
 60    def cursor_field(self) -> Optional[str]:
 61        return self._cursor_field
 62
 63    @lru_cache(maxsize=None)
 64    def get_json_schema(self) -> Mapping[str, Any]:
 65        return self._json_schema
 66
 67    def as_airbyte_stream(self) -> AirbyteStream:
 68        stream = AirbyteStream(
 69            name=self.name,
 70            json_schema=dict(self._json_schema),
 71            supported_sync_modes=[SyncMode.full_refresh],
 72            is_resumable=False,
 73            is_file_based=self._supports_file_transfer,
 74        )
 75
 76        if self._namespace:
 77            stream.namespace = self._namespace
 78
 79        if self._cursor_field:
 80            stream.source_defined_cursor = True
 81            stream.is_resumable = True
 82            stream.supported_sync_modes.append(SyncMode.incremental)
 83            stream.default_cursor_field = [self._cursor_field]
 84
 85        keys = self._primary_key
 86        if keys and len(keys) > 0:
 87            stream.source_defined_primary_key = [[key] for key in keys]
 88
 89        return stream
 90
 91    def log_stream_sync_configuration(self) -> None:
 92        self._logger.debug(
 93            f"Syncing stream instance: {self.name}",
 94            extra={
 95                "primary_key": self._primary_key,
 96                "cursor_field": self.cursor_field,
 97            },
 98        )
 99
100    @property
101    def cursor(self) -> Cursor:
102        return self._cursor
 21class DefaultStream(AbstractStream):
 22    def __init__(
 23        self,
 24        partition_generator: PartitionGenerator,
 25        name: str,
 26        json_schema: Mapping[str, Any],
 27        availability_strategy: AbstractAvailabilityStrategy,
 28        primary_key: List[str],
 29        cursor_field: Optional[str],
 30        logger: Logger,
 31        cursor: Cursor,
 32        namespace: Optional[str] = None,
 33        supports_file_transfer: bool = False,
 34    ) -> None:
 35        self._stream_partition_generator = partition_generator
 36        self._name = name
 37        self._json_schema = json_schema
 38        self._availability_strategy = availability_strategy
 39        self._primary_key = primary_key
 40        self._cursor_field = cursor_field
 41        self._logger = logger
 42        self._cursor = cursor
 43        self._namespace = namespace
 44        self._supports_file_transfer = supports_file_transfer
 45
 46    def generate_partitions(self) -> Iterable[Partition]:
 47        yield from self._stream_partition_generator.generate()
 48
 49    @property
 50    def name(self) -> str:
 51        return self._name
 52
 53    @property
 54    def namespace(self) -> Optional[str]:
 55        return self._namespace
 56
 57    def check_availability(self) -> StreamAvailability:
 58        return self._availability_strategy.check_availability(self._logger)
 59
 60    @property
 61    def cursor_field(self) -> Optional[str]:
 62        return self._cursor_field
 63
 64    @lru_cache(maxsize=None)
 65    def get_json_schema(self) -> Mapping[str, Any]:
 66        return self._json_schema
 67
 68    def as_airbyte_stream(self) -> AirbyteStream:
 69        stream = AirbyteStream(
 70            name=self.name,
 71            json_schema=dict(self._json_schema),
 72            supported_sync_modes=[SyncMode.full_refresh],
 73            is_resumable=False,
 74            is_file_based=self._supports_file_transfer,
 75        )
 76
 77        if self._namespace:
 78            stream.namespace = self._namespace
 79
 80        if self._cursor_field:
 81            stream.source_defined_cursor = True
 82            stream.is_resumable = True
 83            stream.supported_sync_modes.append(SyncMode.incremental)
 84            stream.default_cursor_field = [self._cursor_field]
 85
 86        keys = self._primary_key
 87        if keys and len(keys) > 0:
 88            stream.source_defined_primary_key = [[key] for key in keys]
 89
 90        return stream
 91
 92    def log_stream_sync_configuration(self) -> None:
 93        self._logger.debug(
 94            f"Syncing stream instance: {self.name}",
 95            extra={
 96                "primary_key": self._primary_key,
 97                "cursor_field": self.cursor_field,
 98            },
 99        )
100
101    @property
102    def cursor(self) -> Cursor:
103        return self._cursor

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
DefaultStream( partition_generator: airbyte_cdk.sources.streams.concurrent.partitions.partition_generator.PartitionGenerator, name: str, json_schema: Mapping[str, Any], availability_strategy: airbyte_cdk.sources.streams.concurrent.availability_strategy.AbstractAvailabilityStrategy, primary_key: List[str], cursor_field: Optional[str], logger: logging.Logger, cursor: airbyte_cdk.Cursor, namespace: Optional[str] = None, supports_file_transfer: bool = False)
22    def __init__(
23        self,
24        partition_generator: PartitionGenerator,
25        name: str,
26        json_schema: Mapping[str, Any],
27        availability_strategy: AbstractAvailabilityStrategy,
28        primary_key: List[str],
29        cursor_field: Optional[str],
30        logger: Logger,
31        cursor: Cursor,
32        namespace: Optional[str] = None,
33        supports_file_transfer: bool = False,
34    ) -> None:
35        self._stream_partition_generator = partition_generator
36        self._name = name
37        self._json_schema = json_schema
38        self._availability_strategy = availability_strategy
39        self._primary_key = primary_key
40        self._cursor_field = cursor_field
41        self._logger = logger
42        self._cursor = cursor
43        self._namespace = namespace
44        self._supports_file_transfer = supports_file_transfer
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
46    def generate_partitions(self) -> Iterable[Partition]:
47        yield from self._stream_partition_generator.generate()

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
49    @property
50    def name(self) -> str:
51        return self._name
Returns

The stream name

namespace: Optional[str]
53    @property
54    def namespace(self) -> Optional[str]:
55        return self._namespace
57    def check_availability(self) -> StreamAvailability:
58        return self._availability_strategy.check_availability(self._logger)
Returns

The stream's availability

cursor_field: Optional[str]
60    @property
61    def cursor_field(self) -> Optional[str]:
62        return self._cursor_field

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
64    @lru_cache(maxsize=None)
65    def get_json_schema(self) -> Mapping[str, Any]:
66        return self._json_schema
Returns

A dict of the JSON schema representing this stream.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
68    def as_airbyte_stream(self) -> AirbyteStream:
69        stream = AirbyteStream(
70            name=self.name,
71            json_schema=dict(self._json_schema),
72            supported_sync_modes=[SyncMode.full_refresh],
73            is_resumable=False,
74            is_file_based=self._supports_file_transfer,
75        )
76
77        if self._namespace:
78            stream.namespace = self._namespace
79
80        if self._cursor_field:
81            stream.source_defined_cursor = True
82            stream.is_resumable = True
83            stream.supported_sync_modes.append(SyncMode.incremental)
84            stream.default_cursor_field = [self._cursor_field]
85
86        keys = self._primary_key
87        if keys and len(keys) > 0:
88            stream.source_defined_primary_key = [[key] for key in keys]
89
90        return stream
Returns

A dict of the JSON schema representing this stream.

def log_stream_sync_configuration(self) -> None:
92    def log_stream_sync_configuration(self) -> None:
93        self._logger.debug(
94            f"Syncing stream instance: {self.name}",
95            extra={
96                "primary_key": self._primary_key,
97                "cursor_field": self.cursor_field,
98            },
99        )

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
101    @property
102    def cursor(self) -> Cursor:
103        return self._cursor
Returns

The cursor associated with this stream.