airbyte_cdk.sources.streams.concurrent.default_stream
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from functools import lru_cache 6from logging import Logger 7from typing import Any, Iterable, List, Mapping, Optional 8 9from airbyte_cdk.models import AirbyteStream, SyncMode 10from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 11from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( 12 AbstractAvailabilityStrategy, 13 StreamAvailability, 14) 15from airbyte_cdk.sources.streams.concurrent.cursor import Cursor 16from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 17from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 18 19 20class DefaultStream(AbstractStream): 21 def __init__( 22 self, 23 partition_generator: PartitionGenerator, 24 name: str, 25 json_schema: Mapping[str, Any], 26 availability_strategy: AbstractAvailabilityStrategy, 27 primary_key: List[str], 28 cursor_field: Optional[str], 29 logger: Logger, 30 cursor: Cursor, 31 namespace: Optional[str] = None, 32 supports_file_transfer: bool = False, 33 ) -> None: 34 self._stream_partition_generator = partition_generator 35 self._name = name 36 self._json_schema = json_schema 37 self._availability_strategy = availability_strategy 38 self._primary_key = primary_key 39 self._cursor_field = cursor_field 40 self._logger = logger 41 self._cursor = cursor 42 self._namespace = namespace 43 self._supports_file_transfer = supports_file_transfer 44 45 def generate_partitions(self) -> Iterable[Partition]: 46 yield from self._stream_partition_generator.generate() 47 48 @property 49 def name(self) -> str: 50 return self._name 51 52 @property 53 def namespace(self) -> Optional[str]: 54 return self._namespace 55 56 def check_availability(self) -> StreamAvailability: 57 return self._availability_strategy.check_availability(self._logger) 58 59 @property 60 def cursor_field(self) -> Optional[str]: 61 return self._cursor_field 62 63 @lru_cache(maxsize=None) 64 def get_json_schema(self) -> Mapping[str, Any]: 65 return self._json_schema 66 67 def as_airbyte_stream(self) -> AirbyteStream: 68 stream = AirbyteStream( 69 name=self.name, 70 json_schema=dict(self._json_schema), 71 supported_sync_modes=[SyncMode.full_refresh], 72 is_resumable=False, 73 is_file_based=self._supports_file_transfer, 74 ) 75 76 if self._namespace: 77 stream.namespace = self._namespace 78 79 if self._cursor_field: 80 stream.source_defined_cursor = True 81 stream.is_resumable = True 82 stream.supported_sync_modes.append(SyncMode.incremental) 83 stream.default_cursor_field = [self._cursor_field] 84 85 keys = self._primary_key 86 if keys and len(keys) > 0: 87 stream.source_defined_primary_key = [[key] for key in keys] 88 89 return stream 90 91 def log_stream_sync_configuration(self) -> None: 92 self._logger.debug( 93 f"Syncing stream instance: {self.name}", 94 extra={ 95 "primary_key": self._primary_key, 96 "cursor_field": self.cursor_field, 97 }, 98 ) 99 100 @property 101 def cursor(self) -> Cursor: 102 return self._cursor
21class DefaultStream(AbstractStream): 22 def __init__( 23 self, 24 partition_generator: PartitionGenerator, 25 name: str, 26 json_schema: Mapping[str, Any], 27 availability_strategy: AbstractAvailabilityStrategy, 28 primary_key: List[str], 29 cursor_field: Optional[str], 30 logger: Logger, 31 cursor: Cursor, 32 namespace: Optional[str] = None, 33 supports_file_transfer: bool = False, 34 ) -> None: 35 self._stream_partition_generator = partition_generator 36 self._name = name 37 self._json_schema = json_schema 38 self._availability_strategy = availability_strategy 39 self._primary_key = primary_key 40 self._cursor_field = cursor_field 41 self._logger = logger 42 self._cursor = cursor 43 self._namespace = namespace 44 self._supports_file_transfer = supports_file_transfer 45 46 def generate_partitions(self) -> Iterable[Partition]: 47 yield from self._stream_partition_generator.generate() 48 49 @property 50 def name(self) -> str: 51 return self._name 52 53 @property 54 def namespace(self) -> Optional[str]: 55 return self._namespace 56 57 def check_availability(self) -> StreamAvailability: 58 return self._availability_strategy.check_availability(self._logger) 59 60 @property 61 def cursor_field(self) -> Optional[str]: 62 return self._cursor_field 63 64 @lru_cache(maxsize=None) 65 def get_json_schema(self) -> Mapping[str, Any]: 66 return self._json_schema 67 68 def as_airbyte_stream(self) -> AirbyteStream: 69 stream = AirbyteStream( 70 name=self.name, 71 json_schema=dict(self._json_schema), 72 supported_sync_modes=[SyncMode.full_refresh], 73 is_resumable=False, 74 is_file_based=self._supports_file_transfer, 75 ) 76 77 if self._namespace: 78 stream.namespace = self._namespace 79 80 if self._cursor_field: 81 stream.source_defined_cursor = True 82 stream.is_resumable = True 83 stream.supported_sync_modes.append(SyncMode.incremental) 84 stream.default_cursor_field = [self._cursor_field] 85 86 keys = self._primary_key 87 if keys and len(keys) > 0: 88 stream.source_defined_primary_key = [[key] for key in keys] 89 90 return stream 91 92 def log_stream_sync_configuration(self) -> None: 93 self._logger.debug( 94 f"Syncing stream instance: {self.name}", 95 extra={ 96 "primary_key": self._primary_key, 97 "cursor_field": self.cursor_field, 98 }, 99 ) 100 101 @property 102 def cursor(self) -> Cursor: 103 return self._cursor
AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.
Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
High level, the changes we are targeting are:
- Removing superfluous or leaky parameters from the methods' interfaces
- Using composition instead of inheritance to add new capabilities
To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
- Only full refresh is supported. This will be addressed in the future.
- The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
- Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
- The Stream's behavior cannot depend on a namespace
- TypeTransformer is not supported. This will be addressed in the future.
- Nested cursor and primary keys are not supported
22 def __init__( 23 self, 24 partition_generator: PartitionGenerator, 25 name: str, 26 json_schema: Mapping[str, Any], 27 availability_strategy: AbstractAvailabilityStrategy, 28 primary_key: List[str], 29 cursor_field: Optional[str], 30 logger: Logger, 31 cursor: Cursor, 32 namespace: Optional[str] = None, 33 supports_file_transfer: bool = False, 34 ) -> None: 35 self._stream_partition_generator = partition_generator 36 self._name = name 37 self._json_schema = json_schema 38 self._availability_strategy = availability_strategy 39 self._primary_key = primary_key 40 self._cursor_field = cursor_field 41 self._logger = logger 42 self._cursor = cursor 43 self._namespace = namespace 44 self._supports_file_transfer = supports_file_transfer
46 def generate_partitions(self) -> Iterable[Partition]: 47 yield from self._stream_partition_generator.generate()
Generates the partitions that will be read by this stream.
Returns
An iterable of partitions.
57 def check_availability(self) -> StreamAvailability: 58 return self._availability_strategy.check_availability(self._logger)
Returns
The stream's availability
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. Nested cursor fields are not supported.
64 @lru_cache(maxsize=None) 65 def get_json_schema(self) -> Mapping[str, Any]: 66 return self._json_schema
Returns
A dict of the JSON schema representing this stream.
68 def as_airbyte_stream(self) -> AirbyteStream: 69 stream = AirbyteStream( 70 name=self.name, 71 json_schema=dict(self._json_schema), 72 supported_sync_modes=[SyncMode.full_refresh], 73 is_resumable=False, 74 is_file_based=self._supports_file_transfer, 75 ) 76 77 if self._namespace: 78 stream.namespace = self._namespace 79 80 if self._cursor_field: 81 stream.source_defined_cursor = True 82 stream.is_resumable = True 83 stream.supported_sync_modes.append(SyncMode.incremental) 84 stream.default_cursor_field = [self._cursor_field] 85 86 keys = self._primary_key 87 if keys and len(keys) > 0: 88 stream.source_defined_primary_key = [[key] for key in keys] 89 90 return stream
Returns
A dict of the JSON schema representing this stream.
92 def log_stream_sync_configuration(self) -> None: 93 self._logger.debug( 94 f"Syncing stream instance: {self.name}", 95 extra={ 96 "primary_key": self._primary_key, 97 "cursor_field": self.cursor_field, 98 }, 99 )
Logs the stream's configuration for debugging purposes.