airbyte_cdk.sources.file_based.stream.abstract_file_based_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from abc import abstractmethod
  6from functools import cache, cached_property, lru_cache
  7from typing import Any, Dict, Iterable, List, Mapping, Optional, Type
  8
  9from typing_extensions import deprecated
 10
 11from airbyte_cdk import AirbyteMessage
 12from airbyte_cdk.models import SyncMode
 13from airbyte_cdk.sources.file_based.availability_strategy import (
 14    AbstractFileBasedAvailabilityStrategy,
 15)
 16from airbyte_cdk.sources.file_based.config.file_based_stream_config import (
 17    FileBasedStreamConfig,
 18    PrimaryKeyType,
 19)
 20from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
 21from airbyte_cdk.sources.file_based.exceptions import (
 22    FileBasedErrorsCollector,
 23    FileBasedSourceError,
 24    RecordParseError,
 25    UndefinedParserError,
 26)
 27from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
 28from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
 29from airbyte_cdk.sources.file_based.remote_file import RemoteFile
 30from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy
 31from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
 32from airbyte_cdk.sources.file_based.types import StreamSlice
 33from airbyte_cdk.sources.streams import Stream
 34from airbyte_cdk.sources.streams.checkpoint import Cursor
 35
 36
 37class AbstractFileBasedStream(Stream):
 38    """
 39    A file-based stream in an Airbyte source.
 40
 41    In addition to the base Stream attributes, a file-based stream has
 42    - A config object (derived from the corresponding stream section in source config).
 43      This contains the globs defining the stream's files.
 44    - A StreamReader, which knows how to list and open files in the stream.
 45    - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open
 46      files in the stream.
 47    - A DiscoveryPolicy that controls the number of concurrent requests sent to the source
 48      during discover, and the number of files used for schema discovery.
 49    - A dictionary of FileType:Parser that holds all the file types that can be handled
 50      by the stream.
 51    """
 52
 53    def __init__(
 54        self,
 55        config: FileBasedStreamConfig,
 56        catalog_schema: Optional[Mapping[str, Any]],
 57        stream_reader: AbstractFileBasedStreamReader,
 58        availability_strategy: AbstractFileBasedAvailabilityStrategy,
 59        discovery_policy: AbstractDiscoveryPolicy,
 60        parsers: Dict[Type[Any], FileTypeParser],
 61        validation_policy: AbstractSchemaValidationPolicy,
 62        errors_collector: FileBasedErrorsCollector,
 63        cursor: AbstractFileBasedCursor,
 64    ):
 65        super().__init__()
 66        self.config = config
 67        self.catalog_schema = catalog_schema
 68        self.validation_policy = validation_policy
 69        self.stream_reader = stream_reader
 70        self._discovery_policy = discovery_policy
 71        self._availability_strategy = availability_strategy
 72        self._parsers = parsers
 73        self.errors_collector = errors_collector
 74        self._cursor = cursor
 75
 76    @property
 77    @abstractmethod
 78    def primary_key(self) -> PrimaryKeyType: ...
 79
 80    @cache
 81    def list_files(self) -> List[RemoteFile]:
 82        """
 83        List all files that belong to the stream.
 84
 85        The output of this method is cached so we don't need to list the files more than once.
 86        This means we won't pick up changes to the files during a sync. This method uses the
 87        get_files method which is implemented by the concrete stream class.
 88        """
 89        return list(self.get_files())
 90
 91    @abstractmethod
 92    def get_files(self) -> Iterable[RemoteFile]:
 93        """
 94        List all files that belong to the stream as defined by the stream's globs.
 95        """
 96        ...
 97
 98    def read_records(
 99        self,
100        sync_mode: SyncMode,
101        cursor_field: Optional[List[str]] = None,
102        stream_slice: Optional[StreamSlice] = None,
103        stream_state: Optional[Mapping[str, Any]] = None,
104    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
105        """
106        Yield all records from all remote files in `list_files_for_this_sync`.
107        This method acts as an adapter between the generic Stream interface and the file-based's
108        stream since file-based streams manage their own states.
109        """
110        if stream_slice is None:
111            raise ValueError("stream_slice must be set")
112        return self.read_records_from_slice(stream_slice)
113
114    @abstractmethod
115    def read_records_from_slice(
116        self, stream_slice: StreamSlice
117    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
118        """
119        Yield all records from all remote files in `list_files_for_this_sync`.
120        """
121        ...
122
123    def stream_slices(
124        self,
125        *,
126        sync_mode: SyncMode,
127        cursor_field: Optional[List[str]] = None,
128        stream_state: Optional[Mapping[str, Any]] = None,
129    ) -> Iterable[Optional[Mapping[str, Any]]]:
130        """
131        This method acts as an adapter between the generic Stream interface and the file-based's
132        stream since file-based streams manage their own states.
133        """
134        return self.compute_slices()
135
136    @abstractmethod
137    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
138        """
139        Return a list of slices that will be used to read files in the current sync.
140        :return: The slices to use for the current sync.
141        """
142        ...
143
144    @abstractmethod
145    @lru_cache(maxsize=None)
146    def get_json_schema(self) -> Mapping[str, Any]:
147        """
148        Return the JSON Schema for a stream.
149        """
150        ...
151
152    @abstractmethod
153    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
154        """
155        Infer the schema for files in the stream.
156        """
157        ...
158
159    def get_parser(self) -> FileTypeParser:
160        try:
161            return self._parsers[type(self.config.format)]
162        except KeyError:
163            raise UndefinedParserError(
164                FileBasedSourceError.UNDEFINED_PARSER,
165                stream=self.name,
166                format=type(self.config.format),
167            )
168
169    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
170        if self.validation_policy:
171            return self.validation_policy.record_passes_validation_policy(
172                record=record, schema=self.catalog_schema
173            )
174        else:
175            raise RecordParseError(
176                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
177                stream=self.name,
178                validation_policy=self.config.validation_policy,
179            )
180
181    @cached_property
182    @deprecated("Deprecated as of CDK version 3.7.0.")
183    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
184        return self._availability_strategy
185
186    @property
187    def name(self) -> str:
188        return self.config.name
189
190    def get_cursor(self) -> Optional[Cursor]:
191        """
192        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
193        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
194        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
195        then this override can be removed.
196        """
197        return None
class AbstractFileBasedStream(airbyte_cdk.sources.streams.core.Stream):
 38class AbstractFileBasedStream(Stream):
 39    """
 40    A file-based stream in an Airbyte source.
 41
 42    In addition to the base Stream attributes, a file-based stream has
 43    - A config object (derived from the corresponding stream section in source config).
 44      This contains the globs defining the stream's files.
 45    - A StreamReader, which knows how to list and open files in the stream.
 46    - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open
 47      files in the stream.
 48    - A DiscoveryPolicy that controls the number of concurrent requests sent to the source
 49      during discover, and the number of files used for schema discovery.
 50    - A dictionary of FileType:Parser that holds all the file types that can be handled
 51      by the stream.
 52    """
 53
 54    def __init__(
 55        self,
 56        config: FileBasedStreamConfig,
 57        catalog_schema: Optional[Mapping[str, Any]],
 58        stream_reader: AbstractFileBasedStreamReader,
 59        availability_strategy: AbstractFileBasedAvailabilityStrategy,
 60        discovery_policy: AbstractDiscoveryPolicy,
 61        parsers: Dict[Type[Any], FileTypeParser],
 62        validation_policy: AbstractSchemaValidationPolicy,
 63        errors_collector: FileBasedErrorsCollector,
 64        cursor: AbstractFileBasedCursor,
 65    ):
 66        super().__init__()
 67        self.config = config
 68        self.catalog_schema = catalog_schema
 69        self.validation_policy = validation_policy
 70        self.stream_reader = stream_reader
 71        self._discovery_policy = discovery_policy
 72        self._availability_strategy = availability_strategy
 73        self._parsers = parsers
 74        self.errors_collector = errors_collector
 75        self._cursor = cursor
 76
 77    @property
 78    @abstractmethod
 79    def primary_key(self) -> PrimaryKeyType: ...
 80
 81    @cache
 82    def list_files(self) -> List[RemoteFile]:
 83        """
 84        List all files that belong to the stream.
 85
 86        The output of this method is cached so we don't need to list the files more than once.
 87        This means we won't pick up changes to the files during a sync. This method uses the
 88        get_files method which is implemented by the concrete stream class.
 89        """
 90        return list(self.get_files())
 91
 92    @abstractmethod
 93    def get_files(self) -> Iterable[RemoteFile]:
 94        """
 95        List all files that belong to the stream as defined by the stream's globs.
 96        """
 97        ...
 98
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)
114
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...
123
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()
136
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...
144
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...
152
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...
159
160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166                stream=self.name,
167                format=type(self.config.format),
168            )
169
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178                stream=self.name,
179                validation_policy=self.config.validation_policy,
180            )
181
182    @cached_property
183    @deprecated("Deprecated as of CDK version 3.7.0.")
184    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
185        return self._availability_strategy
186
187    @property
188    def name(self) -> str:
189        return self.config.name
190
191    def get_cursor(self) -> Optional[Cursor]:
192        """
193        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
194        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
195        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
196        then this override can be removed.
197        """
198        return None

A file-based stream in an Airbyte source.

In addition to the base Stream attributes, a file-based stream has

  • A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
  • A StreamReader, which knows how to list and open files in the stream.
  • A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
  • A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
  • A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
config
catalog_schema
validation_policy
stream_reader
errors_collector
primary_key: Union[str, List[str], NoneType]
77    @property
78    @abstractmethod
79    def primary_key(self) -> PrimaryKeyType: ...
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

@cache
def list_files(self) -> List[airbyte_cdk.sources.file_based.RemoteFile]:
81    @cache
82    def list_files(self) -> List[RemoteFile]:
83        """
84        List all files that belong to the stream.
85
86        The output of this method is cached so we don't need to list the files more than once.
87        This means we won't pick up changes to the files during a sync. This method uses the
88        get_files method which is implemented by the concrete stream class.
89        """
90        return list(self.get_files())

List all files that belong to the stream.

The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.

@abstractmethod
def get_files(self) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
92    @abstractmethod
93    def get_files(self) -> Iterable[RemoteFile]:
94        """
95        List all files that belong to the stream as defined by the stream's globs.
96        """
97        ...

List all files that belong to the stream as defined by the stream's globs.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)

Yield all records from all remote files in list_files_for_this_sync. This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

@abstractmethod
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...

Yield all records from all remote files in list_files_for_this_sync.

def stream_slices( self, *, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()

This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

@abstractmethod
def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...

Return a list of slices that will be used to read files in the current sync.

Returns

The slices to use for the current sync.

@abstractmethod
@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...

Return the JSON Schema for a stream.

@abstractmethod
def infer_schema( self, files: List[airbyte_cdk.sources.file_based.RemoteFile]) -> Mapping[str, Any]:
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...

Infer the schema for files in the stream.

160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166                stream=self.name,
167                format=type(self.config.format),
168            )
def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178                stream=self.name,
179                validation_policy=self.config.validation_policy,
180            )
182    @cached_property
183    @deprecated("Deprecated as of CDK version 3.7.0.")
184    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
185        return self._availability_strategy
name: str
187    @property
188    def name(self) -> str:
189        return self.config.name
Returns

Stream name. By default this is the implementing class name, but it can be overridden as needed.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
191    def get_cursor(self) -> Optional[Cursor]:
192        """
193        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
194        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
195        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
196        then this override can be removed.
197        """
198        return None

This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.