airbyte_cdk.sources.file_based.stream.abstract_file_based_stream
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import abstractmethod 6from functools import cache, cached_property, lru_cache 7from typing import Any, Dict, Iterable, List, Mapping, Optional, Type 8 9from typing_extensions import deprecated 10 11from airbyte_cdk import AirbyteMessage 12from airbyte_cdk.models import SyncMode 13from airbyte_cdk.sources.file_based.availability_strategy import ( 14 AbstractFileBasedAvailabilityStrategy, 15) 16from airbyte_cdk.sources.file_based.config.file_based_stream_config import ( 17 FileBasedStreamConfig, 18 PrimaryKeyType, 19) 20from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy 21from airbyte_cdk.sources.file_based.exceptions import ( 22 FileBasedErrorsCollector, 23 FileBasedSourceError, 24 RecordParseError, 25 UndefinedParserError, 26) 27from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader 28from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser 29from airbyte_cdk.sources.file_based.remote_file import RemoteFile 30from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy 31from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor 32from airbyte_cdk.sources.file_based.types import StreamSlice 33from airbyte_cdk.sources.streams import Stream 34from airbyte_cdk.sources.streams.checkpoint import Cursor 35 36 37class AbstractFileBasedStream(Stream): 38 """ 39 A file-based stream in an Airbyte source. 40 41 In addition to the base Stream attributes, a file-based stream has 42 - A config object (derived from the corresponding stream section in source config). 43 This contains the globs defining the stream's files. 44 - A StreamReader, which knows how to list and open files in the stream. 45 - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open 46 files in the stream. 47 - A DiscoveryPolicy that controls the number of concurrent requests sent to the source 48 during discover, and the number of files used for schema discovery. 49 - A dictionary of FileType:Parser that holds all the file types that can be handled 50 by the stream. 51 """ 52 53 def __init__( 54 self, 55 config: FileBasedStreamConfig, 56 catalog_schema: Optional[Mapping[str, Any]], 57 stream_reader: AbstractFileBasedStreamReader, 58 availability_strategy: AbstractFileBasedAvailabilityStrategy, 59 discovery_policy: AbstractDiscoveryPolicy, 60 parsers: Dict[Type[Any], FileTypeParser], 61 validation_policy: AbstractSchemaValidationPolicy, 62 errors_collector: FileBasedErrorsCollector, 63 cursor: AbstractFileBasedCursor, 64 ): 65 super().__init__() 66 self.config = config 67 self.catalog_schema = catalog_schema 68 self.validation_policy = validation_policy 69 self.stream_reader = stream_reader 70 self._discovery_policy = discovery_policy 71 self._availability_strategy = availability_strategy 72 self._parsers = parsers 73 self.errors_collector = errors_collector 74 self._cursor = cursor 75 76 @property 77 @abstractmethod 78 def primary_key(self) -> PrimaryKeyType: ... 79 80 @cache 81 def list_files(self) -> List[RemoteFile]: 82 """ 83 List all files that belong to the stream. 84 85 The output of this method is cached so we don't need to list the files more than once. 86 This means we won't pick up changes to the files during a sync. This method uses the 87 get_files method which is implemented by the concrete stream class. 88 """ 89 return list(self.get_files()) 90 91 @abstractmethod 92 def get_files(self) -> Iterable[RemoteFile]: 93 """ 94 List all files that belong to the stream as defined by the stream's globs. 95 """ 96 ... 97 98 def read_records( 99 self, 100 sync_mode: SyncMode, 101 cursor_field: Optional[List[str]] = None, 102 stream_slice: Optional[StreamSlice] = None, 103 stream_state: Optional[Mapping[str, Any]] = None, 104 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 105 """ 106 Yield all records from all remote files in `list_files_for_this_sync`. 107 This method acts as an adapter between the generic Stream interface and the file-based's 108 stream since file-based streams manage their own states. 109 """ 110 if stream_slice is None: 111 raise ValueError("stream_slice must be set") 112 return self.read_records_from_slice(stream_slice) 113 114 @abstractmethod 115 def read_records_from_slice( 116 self, stream_slice: StreamSlice 117 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 118 """ 119 Yield all records from all remote files in `list_files_for_this_sync`. 120 """ 121 ... 122 123 def stream_slices( 124 self, 125 *, 126 sync_mode: SyncMode, 127 cursor_field: Optional[List[str]] = None, 128 stream_state: Optional[Mapping[str, Any]] = None, 129 ) -> Iterable[Optional[Mapping[str, Any]]]: 130 """ 131 This method acts as an adapter between the generic Stream interface and the file-based's 132 stream since file-based streams manage their own states. 133 """ 134 return self.compute_slices() 135 136 @abstractmethod 137 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 138 """ 139 Return a list of slices that will be used to read files in the current sync. 140 :return: The slices to use for the current sync. 141 """ 142 ... 143 144 @abstractmethod 145 @lru_cache(maxsize=None) 146 def get_json_schema(self) -> Mapping[str, Any]: 147 """ 148 Return the JSON Schema for a stream. 149 """ 150 ... 151 152 @abstractmethod 153 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 154 """ 155 Infer the schema for files in the stream. 156 """ 157 ... 158 159 def get_parser(self) -> FileTypeParser: 160 try: 161 return self._parsers[type(self.config.format)] 162 except KeyError: 163 raise UndefinedParserError( 164 FileBasedSourceError.UNDEFINED_PARSER, 165 stream=self.name, 166 format=type(self.config.format), 167 ) 168 169 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 170 if self.validation_policy: 171 return self.validation_policy.record_passes_validation_policy( 172 record=record, schema=self.catalog_schema 173 ) 174 else: 175 raise RecordParseError( 176 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 177 stream=self.name, 178 validation_policy=self.config.validation_policy, 179 ) 180 181 @cached_property 182 @deprecated("Deprecated as of CDK version 3.7.0.") 183 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 184 return self._availability_strategy 185 186 @property 187 def name(self) -> str: 188 return self.config.name 189 190 def get_cursor(self) -> Optional[Cursor]: 191 """ 192 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 193 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 194 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 195 then this override can be removed. 196 """ 197 return None
38class AbstractFileBasedStream(Stream): 39 """ 40 A file-based stream in an Airbyte source. 41 42 In addition to the base Stream attributes, a file-based stream has 43 - A config object (derived from the corresponding stream section in source config). 44 This contains the globs defining the stream's files. 45 - A StreamReader, which knows how to list and open files in the stream. 46 - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open 47 files in the stream. 48 - A DiscoveryPolicy that controls the number of concurrent requests sent to the source 49 during discover, and the number of files used for schema discovery. 50 - A dictionary of FileType:Parser that holds all the file types that can be handled 51 by the stream. 52 """ 53 54 def __init__( 55 self, 56 config: FileBasedStreamConfig, 57 catalog_schema: Optional[Mapping[str, Any]], 58 stream_reader: AbstractFileBasedStreamReader, 59 availability_strategy: AbstractFileBasedAvailabilityStrategy, 60 discovery_policy: AbstractDiscoveryPolicy, 61 parsers: Dict[Type[Any], FileTypeParser], 62 validation_policy: AbstractSchemaValidationPolicy, 63 errors_collector: FileBasedErrorsCollector, 64 cursor: AbstractFileBasedCursor, 65 ): 66 super().__init__() 67 self.config = config 68 self.catalog_schema = catalog_schema 69 self.validation_policy = validation_policy 70 self.stream_reader = stream_reader 71 self._discovery_policy = discovery_policy 72 self._availability_strategy = availability_strategy 73 self._parsers = parsers 74 self.errors_collector = errors_collector 75 self._cursor = cursor 76 77 @property 78 @abstractmethod 79 def primary_key(self) -> PrimaryKeyType: ... 80 81 @cache 82 def list_files(self) -> List[RemoteFile]: 83 """ 84 List all files that belong to the stream. 85 86 The output of this method is cached so we don't need to list the files more than once. 87 This means we won't pick up changes to the files during a sync. This method uses the 88 get_files method which is implemented by the concrete stream class. 89 """ 90 return list(self.get_files()) 91 92 @abstractmethod 93 def get_files(self) -> Iterable[RemoteFile]: 94 """ 95 List all files that belong to the stream as defined by the stream's globs. 96 """ 97 ... 98 99 def read_records( 100 self, 101 sync_mode: SyncMode, 102 cursor_field: Optional[List[str]] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 stream_state: Optional[Mapping[str, Any]] = None, 105 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 106 """ 107 Yield all records from all remote files in `list_files_for_this_sync`. 108 This method acts as an adapter between the generic Stream interface and the file-based's 109 stream since file-based streams manage their own states. 110 """ 111 if stream_slice is None: 112 raise ValueError("stream_slice must be set") 113 return self.read_records_from_slice(stream_slice) 114 115 @abstractmethod 116 def read_records_from_slice( 117 self, stream_slice: StreamSlice 118 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 119 """ 120 Yield all records from all remote files in `list_files_for_this_sync`. 121 """ 122 ... 123 124 def stream_slices( 125 self, 126 *, 127 sync_mode: SyncMode, 128 cursor_field: Optional[List[str]] = None, 129 stream_state: Optional[Mapping[str, Any]] = None, 130 ) -> Iterable[Optional[Mapping[str, Any]]]: 131 """ 132 This method acts as an adapter between the generic Stream interface and the file-based's 133 stream since file-based streams manage their own states. 134 """ 135 return self.compute_slices() 136 137 @abstractmethod 138 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 139 """ 140 Return a list of slices that will be used to read files in the current sync. 141 :return: The slices to use for the current sync. 142 """ 143 ... 144 145 @abstractmethod 146 @lru_cache(maxsize=None) 147 def get_json_schema(self) -> Mapping[str, Any]: 148 """ 149 Return the JSON Schema for a stream. 150 """ 151 ... 152 153 @abstractmethod 154 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 155 """ 156 Infer the schema for files in the stream. 157 """ 158 ... 159 160 def get_parser(self) -> FileTypeParser: 161 try: 162 return self._parsers[type(self.config.format)] 163 except KeyError: 164 raise UndefinedParserError( 165 FileBasedSourceError.UNDEFINED_PARSER, 166 stream=self.name, 167 format=type(self.config.format), 168 ) 169 170 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 171 if self.validation_policy: 172 return self.validation_policy.record_passes_validation_policy( 173 record=record, schema=self.catalog_schema 174 ) 175 else: 176 raise RecordParseError( 177 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 178 stream=self.name, 179 validation_policy=self.config.validation_policy, 180 ) 181 182 @cached_property 183 @deprecated("Deprecated as of CDK version 3.7.0.") 184 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 185 return self._availability_strategy 186 187 @property 188 def name(self) -> str: 189 return self.config.name 190 191 def get_cursor(self) -> Optional[Cursor]: 192 """ 193 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 194 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 195 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 196 then this override can be removed. 197 """ 198 return None
A file-based stream in an Airbyte source.
In addition to the base Stream attributes, a file-based stream has
- A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
- A StreamReader, which knows how to list and open files in the stream.
- A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
- A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
- A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
81 @cache 82 def list_files(self) -> List[RemoteFile]: 83 """ 84 List all files that belong to the stream. 85 86 The output of this method is cached so we don't need to list the files more than once. 87 This means we won't pick up changes to the files during a sync. This method uses the 88 get_files method which is implemented by the concrete stream class. 89 """ 90 return list(self.get_files())
List all files that belong to the stream.
The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.
92 @abstractmethod 93 def get_files(self) -> Iterable[RemoteFile]: 94 """ 95 List all files that belong to the stream as defined by the stream's globs. 96 """ 97 ...
List all files that belong to the stream as defined by the stream's globs.
99 def read_records( 100 self, 101 sync_mode: SyncMode, 102 cursor_field: Optional[List[str]] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 stream_state: Optional[Mapping[str, Any]] = None, 105 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 106 """ 107 Yield all records from all remote files in `list_files_for_this_sync`. 108 This method acts as an adapter between the generic Stream interface and the file-based's 109 stream since file-based streams manage their own states. 110 """ 111 if stream_slice is None: 112 raise ValueError("stream_slice must be set") 113 return self.read_records_from_slice(stream_slice)
Yield all records from all remote files in list_files_for_this_sync
.
This method acts as an adapter between the generic Stream interface and the file-based's
stream since file-based streams manage their own states.
115 @abstractmethod 116 def read_records_from_slice( 117 self, stream_slice: StreamSlice 118 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 119 """ 120 Yield all records from all remote files in `list_files_for_this_sync`. 121 """ 122 ...
Yield all records from all remote files in list_files_for_this_sync
.
124 def stream_slices( 125 self, 126 *, 127 sync_mode: SyncMode, 128 cursor_field: Optional[List[str]] = None, 129 stream_state: Optional[Mapping[str, Any]] = None, 130 ) -> Iterable[Optional[Mapping[str, Any]]]: 131 """ 132 This method acts as an adapter between the generic Stream interface and the file-based's 133 stream since file-based streams manage their own states. 134 """ 135 return self.compute_slices()
This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.
137 @abstractmethod 138 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 139 """ 140 Return a list of slices that will be used to read files in the current sync. 141 :return: The slices to use for the current sync. 142 """ 143 ...
Return a list of slices that will be used to read files in the current sync.
Returns
The slices to use for the current sync.
145 @abstractmethod 146 @lru_cache(maxsize=None) 147 def get_json_schema(self) -> Mapping[str, Any]: 148 """ 149 Return the JSON Schema for a stream. 150 """ 151 ...
Return the JSON Schema for a stream.
153 @abstractmethod 154 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 155 """ 156 Infer the schema for files in the stream. 157 """ 158 ...
Infer the schema for files in the stream.
170 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 171 if self.validation_policy: 172 return self.validation_policy.record_passes_validation_policy( 173 record=record, schema=self.catalog_schema 174 ) 175 else: 176 raise RecordParseError( 177 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 178 stream=self.name, 179 validation_policy=self.config.validation_policy, 180 )
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
191 def get_cursor(self) -> Optional[Cursor]: 192 """ 193 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 194 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 195 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 196 then this override can be removed. 197 """ 198 return None
This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- is_resumable
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema