airbyte_cdk.sources.streams.concurrent.default_stream

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from functools import lru_cache
 6from logging import Logger
 7from typing import Any, Iterable, List, Mapping, Optional
 8
 9from airbyte_cdk.models import AirbyteStream, SyncMode
10from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
11from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
12    AbstractAvailabilityStrategy,
13    StreamAvailability,
14)
15from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
16from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
17from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
18
19
20class DefaultStream(AbstractStream):
21    def __init__(
22        self,
23        partition_generator: PartitionGenerator,
24        name: str,
25        json_schema: Mapping[str, Any],
26        availability_strategy: AbstractAvailabilityStrategy,
27        primary_key: List[str],
28        cursor_field: Optional[str],
29        logger: Logger,
30        cursor: Cursor,
31        namespace: Optional[str] = None,
32    ) -> None:
33        self._stream_partition_generator = partition_generator
34        self._name = name
35        self._json_schema = json_schema
36        self._availability_strategy = availability_strategy
37        self._primary_key = primary_key
38        self._cursor_field = cursor_field
39        self._logger = logger
40        self._cursor = cursor
41        self._namespace = namespace
42
43    def generate_partitions(self) -> Iterable[Partition]:
44        yield from self._stream_partition_generator.generate()
45
46    @property
47    def name(self) -> str:
48        return self._name
49
50    @property
51    def namespace(self) -> Optional[str]:
52        return self._namespace
53
54    def check_availability(self) -> StreamAvailability:
55        return self._availability_strategy.check_availability(self._logger)
56
57    @property
58    def cursor_field(self) -> Optional[str]:
59        return self._cursor_field
60
61    @lru_cache(maxsize=None)
62    def get_json_schema(self) -> Mapping[str, Any]:
63        return self._json_schema
64
65    def as_airbyte_stream(self) -> AirbyteStream:
66        stream = AirbyteStream(
67            name=self.name,
68            json_schema=dict(self._json_schema),
69            supported_sync_modes=[SyncMode.full_refresh],
70            is_resumable=False,
71        )
72
73        if self._namespace:
74            stream.namespace = self._namespace
75
76        if self._cursor_field:
77            stream.source_defined_cursor = True
78            stream.is_resumable = True
79            stream.supported_sync_modes.append(SyncMode.incremental)
80            stream.default_cursor_field = [self._cursor_field]
81
82        keys = self._primary_key
83        if keys and len(keys) > 0:
84            stream.source_defined_primary_key = [[key] for key in keys]
85
86        return stream
87
88    def log_stream_sync_configuration(self) -> None:
89        self._logger.debug(
90            f"Syncing stream instance: {self.name}",
91            extra={
92                "primary_key": self._primary_key,
93                "cursor_field": self.cursor_field,
94            },
95        )
96
97    @property
98    def cursor(self) -> Cursor:
99        return self._cursor
 21class DefaultStream(AbstractStream):
 22    def __init__(
 23        self,
 24        partition_generator: PartitionGenerator,
 25        name: str,
 26        json_schema: Mapping[str, Any],
 27        availability_strategy: AbstractAvailabilityStrategy,
 28        primary_key: List[str],
 29        cursor_field: Optional[str],
 30        logger: Logger,
 31        cursor: Cursor,
 32        namespace: Optional[str] = None,
 33    ) -> None:
 34        self._stream_partition_generator = partition_generator
 35        self._name = name
 36        self._json_schema = json_schema
 37        self._availability_strategy = availability_strategy
 38        self._primary_key = primary_key
 39        self._cursor_field = cursor_field
 40        self._logger = logger
 41        self._cursor = cursor
 42        self._namespace = namespace
 43
 44    def generate_partitions(self) -> Iterable[Partition]:
 45        yield from self._stream_partition_generator.generate()
 46
 47    @property
 48    def name(self) -> str:
 49        return self._name
 50
 51    @property
 52    def namespace(self) -> Optional[str]:
 53        return self._namespace
 54
 55    def check_availability(self) -> StreamAvailability:
 56        return self._availability_strategy.check_availability(self._logger)
 57
 58    @property
 59    def cursor_field(self) -> Optional[str]:
 60        return self._cursor_field
 61
 62    @lru_cache(maxsize=None)
 63    def get_json_schema(self) -> Mapping[str, Any]:
 64        return self._json_schema
 65
 66    def as_airbyte_stream(self) -> AirbyteStream:
 67        stream = AirbyteStream(
 68            name=self.name,
 69            json_schema=dict(self._json_schema),
 70            supported_sync_modes=[SyncMode.full_refresh],
 71            is_resumable=False,
 72        )
 73
 74        if self._namespace:
 75            stream.namespace = self._namespace
 76
 77        if self._cursor_field:
 78            stream.source_defined_cursor = True
 79            stream.is_resumable = True
 80            stream.supported_sync_modes.append(SyncMode.incremental)
 81            stream.default_cursor_field = [self._cursor_field]
 82
 83        keys = self._primary_key
 84        if keys and len(keys) > 0:
 85            stream.source_defined_primary_key = [[key] for key in keys]
 86
 87        return stream
 88
 89    def log_stream_sync_configuration(self) -> None:
 90        self._logger.debug(
 91            f"Syncing stream instance: {self.name}",
 92            extra={
 93                "primary_key": self._primary_key,
 94                "cursor_field": self.cursor_field,
 95            },
 96        )
 97
 98    @property
 99    def cursor(self) -> Cursor:
100        return self._cursor

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
DefaultStream( partition_generator: airbyte_cdk.sources.streams.concurrent.partitions.partition_generator.PartitionGenerator, name: str, json_schema: Mapping[str, Any], availability_strategy: airbyte_cdk.sources.streams.concurrent.availability_strategy.AbstractAvailabilityStrategy, primary_key: List[str], cursor_field: Optional[str], logger: logging.Logger, cursor: airbyte_cdk.Cursor, namespace: Optional[str] = None)
22    def __init__(
23        self,
24        partition_generator: PartitionGenerator,
25        name: str,
26        json_schema: Mapping[str, Any],
27        availability_strategy: AbstractAvailabilityStrategy,
28        primary_key: List[str],
29        cursor_field: Optional[str],
30        logger: Logger,
31        cursor: Cursor,
32        namespace: Optional[str] = None,
33    ) -> None:
34        self._stream_partition_generator = partition_generator
35        self._name = name
36        self._json_schema = json_schema
37        self._availability_strategy = availability_strategy
38        self._primary_key = primary_key
39        self._cursor_field = cursor_field
40        self._logger = logger
41        self._cursor = cursor
42        self._namespace = namespace
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
44    def generate_partitions(self) -> Iterable[Partition]:
45        yield from self._stream_partition_generator.generate()

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
47    @property
48    def name(self) -> str:
49        return self._name
Returns

The stream name

namespace: Optional[str]
51    @property
52    def namespace(self) -> Optional[str]:
53        return self._namespace
55    def check_availability(self) -> StreamAvailability:
56        return self._availability_strategy.check_availability(self._logger)
Returns

The stream's availability

cursor_field: Optional[str]
58    @property
59    def cursor_field(self) -> Optional[str]:
60        return self._cursor_field

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
62    @lru_cache(maxsize=None)
63    def get_json_schema(self) -> Mapping[str, Any]:
64        return self._json_schema
Returns

A dict of the JSON schema representing this stream.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
66    def as_airbyte_stream(self) -> AirbyteStream:
67        stream = AirbyteStream(
68            name=self.name,
69            json_schema=dict(self._json_schema),
70            supported_sync_modes=[SyncMode.full_refresh],
71            is_resumable=False,
72        )
73
74        if self._namespace:
75            stream.namespace = self._namespace
76
77        if self._cursor_field:
78            stream.source_defined_cursor = True
79            stream.is_resumable = True
80            stream.supported_sync_modes.append(SyncMode.incremental)
81            stream.default_cursor_field = [self._cursor_field]
82
83        keys = self._primary_key
84        if keys and len(keys) > 0:
85            stream.source_defined_primary_key = [[key] for key in keys]
86
87        return stream
Returns

A dict of the JSON schema representing this stream.

def log_stream_sync_configuration(self) -> None:
89    def log_stream_sync_configuration(self) -> None:
90        self._logger.debug(
91            f"Syncing stream instance: {self.name}",
92            extra={
93                "primary_key": self._primary_key,
94                "cursor_field": self.cursor_field,
95            },
96        )

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
 98    @property
 99    def cursor(self) -> Cursor:
100        return self._cursor
Returns

The cursor associated with this stream.