airbyte_cdk.sources.file_based.stream
1from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream 2from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream 3from airbyte_cdk.sources.file_based.stream.identities_stream import FileIdentitiesStream 4from airbyte_cdk.sources.file_based.stream.permissions_file_based_stream import ( 5 PermissionsFileBasedStream, 6) 7 8__all__ = [ 9 "AbstractFileBasedStream", 10 "DefaultFileBasedStream", 11 "FileIdentitiesStream", 12 "PermissionsFileBasedStream", 13]
38class AbstractFileBasedStream(Stream): 39 """ 40 A file-based stream in an Airbyte source. 41 42 In addition to the base Stream attributes, a file-based stream has 43 - A config object (derived from the corresponding stream section in source config). 44 This contains the globs defining the stream's files. 45 - A StreamReader, which knows how to list and open files in the stream. 46 - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open 47 files in the stream. 48 - A DiscoveryPolicy that controls the number of concurrent requests sent to the source 49 during discover, and the number of files used for schema discovery. 50 - A dictionary of FileType:Parser that holds all the file types that can be handled 51 by the stream. 52 """ 53 54 def __init__( 55 self, 56 config: FileBasedStreamConfig, 57 catalog_schema: Optional[Mapping[str, Any]], 58 stream_reader: AbstractFileBasedStreamReader, 59 availability_strategy: AbstractFileBasedAvailabilityStrategy, 60 discovery_policy: AbstractDiscoveryPolicy, 61 parsers: Dict[Type[Any], FileTypeParser], 62 validation_policy: AbstractSchemaValidationPolicy, 63 errors_collector: FileBasedErrorsCollector, 64 cursor: AbstractFileBasedCursor, 65 ): 66 super().__init__() 67 self.config = config 68 self.catalog_schema = catalog_schema 69 self.validation_policy = validation_policy 70 self.stream_reader = stream_reader 71 self._discovery_policy = discovery_policy 72 self._availability_strategy = availability_strategy 73 self._parsers = parsers 74 self.errors_collector = errors_collector 75 self._cursor = cursor 76 77 @property 78 @abstractmethod 79 def primary_key(self) -> PrimaryKeyType: ... 80 81 @cache 82 def list_files(self) -> List[RemoteFile]: 83 """ 84 List all files that belong to the stream. 85 86 The output of this method is cached so we don't need to list the files more than once. 87 This means we won't pick up changes to the files during a sync. This method uses the 88 get_files method which is implemented by the concrete stream class. 89 """ 90 return list(self.get_files()) 91 92 @abstractmethod 93 def get_files(self) -> Iterable[RemoteFile]: 94 """ 95 List all files that belong to the stream as defined by the stream's globs. 96 """ 97 ... 98 99 def read_records( 100 self, 101 sync_mode: SyncMode, 102 cursor_field: Optional[List[str]] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 stream_state: Optional[Mapping[str, Any]] = None, 105 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 106 """ 107 Yield all records from all remote files in `list_files_for_this_sync`. 108 This method acts as an adapter between the generic Stream interface and the file-based's 109 stream since file-based streams manage their own states. 110 """ 111 if stream_slice is None: 112 raise ValueError("stream_slice must be set") 113 return self.read_records_from_slice(stream_slice) 114 115 @abstractmethod 116 def read_records_from_slice( 117 self, stream_slice: StreamSlice 118 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 119 """ 120 Yield all records from all remote files in `list_files_for_this_sync`. 121 """ 122 ... 123 124 def stream_slices( 125 self, 126 *, 127 sync_mode: SyncMode, 128 cursor_field: Optional[List[str]] = None, 129 stream_state: Optional[Mapping[str, Any]] = None, 130 ) -> Iterable[Optional[Mapping[str, Any]]]: 131 """ 132 This method acts as an adapter between the generic Stream interface and the file-based's 133 stream since file-based streams manage their own states. 134 """ 135 return self.compute_slices() 136 137 @abstractmethod 138 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 139 """ 140 Return a list of slices that will be used to read files in the current sync. 141 :return: The slices to use for the current sync. 142 """ 143 ... 144 145 @abstractmethod 146 @lru_cache(maxsize=None) 147 def get_json_schema(self) -> Mapping[str, Any]: 148 """ 149 Return the JSON Schema for a stream. 150 """ 151 ... 152 153 @abstractmethod 154 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 155 """ 156 Infer the schema for files in the stream. 157 """ 158 ... 159 160 def get_parser(self) -> FileTypeParser: 161 try: 162 return self._parsers[type(self.config.format)] 163 except KeyError: 164 raise UndefinedParserError( 165 FileBasedSourceError.UNDEFINED_PARSER, 166 stream=self.name, 167 format=type(self.config.format), 168 ) 169 170 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 171 if self.validation_policy: 172 return self.validation_policy.record_passes_validation_policy( 173 record=record, schema=self.catalog_schema 174 ) 175 else: 176 raise RecordParseError( 177 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 178 stream=self.name, 179 validation_policy=self.config.validation_policy, 180 ) 181 182 @cached_property 183 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 184 return self._availability_strategy 185 186 @property 187 def name(self) -> str: 188 return self.config.name 189 190 def get_cursor(self) -> Optional[Cursor]: 191 """ 192 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 193 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 194 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 195 then this override can be removed. 196 """ 197 return None
A file-based stream in an Airbyte source.
In addition to the base Stream attributes, a file-based stream has
- A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
- A StreamReader, which knows how to list and open files in the stream.
- A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
- A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
- A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
81 @cache 82 def list_files(self) -> List[RemoteFile]: 83 """ 84 List all files that belong to the stream. 85 86 The output of this method is cached so we don't need to list the files more than once. 87 This means we won't pick up changes to the files during a sync. This method uses the 88 get_files method which is implemented by the concrete stream class. 89 """ 90 return list(self.get_files())
List all files that belong to the stream.
The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.
92 @abstractmethod 93 def get_files(self) -> Iterable[RemoteFile]: 94 """ 95 List all files that belong to the stream as defined by the stream's globs. 96 """ 97 ...
List all files that belong to the stream as defined by the stream's globs.
99 def read_records( 100 self, 101 sync_mode: SyncMode, 102 cursor_field: Optional[List[str]] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 stream_state: Optional[Mapping[str, Any]] = None, 105 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 106 """ 107 Yield all records from all remote files in `list_files_for_this_sync`. 108 This method acts as an adapter between the generic Stream interface and the file-based's 109 stream since file-based streams manage their own states. 110 """ 111 if stream_slice is None: 112 raise ValueError("stream_slice must be set") 113 return self.read_records_from_slice(stream_slice)
Yield all records from all remote files in list_files_for_this_sync.
This method acts as an adapter between the generic Stream interface and the file-based's
stream since file-based streams manage their own states.
115 @abstractmethod 116 def read_records_from_slice( 117 self, stream_slice: StreamSlice 118 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 119 """ 120 Yield all records from all remote files in `list_files_for_this_sync`. 121 """ 122 ...
Yield all records from all remote files in list_files_for_this_sync.
124 def stream_slices( 125 self, 126 *, 127 sync_mode: SyncMode, 128 cursor_field: Optional[List[str]] = None, 129 stream_state: Optional[Mapping[str, Any]] = None, 130 ) -> Iterable[Optional[Mapping[str, Any]]]: 131 """ 132 This method acts as an adapter between the generic Stream interface and the file-based's 133 stream since file-based streams manage their own states. 134 """ 135 return self.compute_slices()
This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.
137 @abstractmethod 138 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 139 """ 140 Return a list of slices that will be used to read files in the current sync. 141 :return: The slices to use for the current sync. 142 """ 143 ...
Return a list of slices that will be used to read files in the current sync.
Returns
The slices to use for the current sync.
145 @abstractmethod 146 @lru_cache(maxsize=None) 147 def get_json_schema(self) -> Mapping[str, Any]: 148 """ 149 Return the JSON Schema for a stream. 150 """ 151 ...
Return the JSON Schema for a stream.
153 @abstractmethod 154 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 155 """ 156 Infer the schema for files in the stream. 157 """ 158 ...
Infer the schema for files in the stream.
170 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 171 if self.validation_policy: 172 return self.validation_policy.record_passes_validation_policy( 173 record=record, schema=self.catalog_schema 174 ) 175 else: 176 raise RecordParseError( 177 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 178 stream=self.name, 179 validation_policy=self.config.validation_policy, 180 )
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
190 def get_cursor(self) -> Optional[Cursor]: 191 """ 192 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 193 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 194 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 195 then this override can be removed. 196 """ 197 return None
This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- is_resumable
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
57class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): 58 """ 59 The default file-based stream. 60 """ 61 62 FILE_TRANSFER_KW = "use_file_transfer" 63 PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure" 64 FILES_KEY = "files" 65 DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 66 ab_last_mod_col = "_ab_source_file_last_modified" 67 ab_file_name_col = "_ab_source_file_url" 68 modified = "modified" 69 source_file_url = "source_file_url" 70 airbyte_columns = [ab_last_mod_col, ab_file_name_col] 71 use_file_transfer = False 72 preserve_directory_structure = True 73 _file_transfer = FileTransfer() 74 75 def __init__(self, **kwargs: Any): 76 if self.FILE_TRANSFER_KW in kwargs: 77 self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) 78 if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs: 79 self.preserve_directory_structure = kwargs.pop( 80 self.PRESERVE_DIRECTORY_STRUCTURE_KW, True 81 ) 82 super().__init__(**kwargs) 83 84 @property 85 def state(self) -> MutableMapping[str, Any]: 86 return self._cursor.get_state() 87 88 @state.setter 89 def state(self, value: MutableMapping[str, Any]) -> None: 90 """State setter, accept state serialized by state getter.""" 91 self._cursor.set_initial_state(value) 92 93 @property # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors 94 def cursor(self) -> Optional[AbstractFileBasedCursor]: 95 return self._cursor 96 97 @cursor.setter 98 def cursor(self, value: AbstractFileBasedCursor) -> None: 99 if self._cursor is not None: 100 raise RuntimeError( 101 f"Cursor for stream {self.name} is already set. This is unexpected. Please contact Support." 102 ) 103 self._cursor = value 104 105 @property 106 def primary_key(self) -> PrimaryKeyType: 107 return self.config.primary_key or self.get_parser().get_parser_defined_primary_key( 108 self.config 109 ) 110 111 def _duplicated_files_names( 112 self, slices: List[dict[str, List[RemoteFile]]] 113 ) -> List[dict[str, List[str]]]: 114 seen_file_names: Dict[str, List[str]] = defaultdict(list) 115 for file_slice in slices: 116 for file_found in file_slice[self.FILES_KEY]: 117 file_name = path.basename(file_found.uri) 118 seen_file_names[file_name].append(file_found.uri) 119 return [ 120 {file_name: paths} for file_name, paths in seen_file_names.items() if len(paths) > 1 121 ] 122 123 def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: 124 # Sort files by last_modified, uri and return them grouped by last_modified 125 all_files = self.list_files() 126 files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) 127 sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) 128 slices = [ 129 {self.FILES_KEY: list(group[1])} 130 for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) 131 ] 132 if slices and not self.preserve_directory_structure: 133 duplicated_files_names = self._duplicated_files_names(slices) 134 if duplicated_files_names: 135 raise DuplicatedFilesError( 136 stream=self.name, duplicated_files_names=duplicated_files_names 137 ) 138 return slices 139 140 def transform_record( 141 self, record: dict[str, Any], file: RemoteFile, last_updated: str 142 ) -> dict[str, Any]: 143 # adds _ab_source_file_last_modified and _ab_source_file_url to the record 144 record[self.ab_last_mod_col] = last_updated 145 record[self.ab_file_name_col] = file.uri 146 return record 147 148 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 149 """ 150 Yield all records from all remote files in `list_files_for_this_sync`. 151 152 If an error is encountered reading records from a file, log a message and do not attempt 153 to sync the rest of the file. 154 """ 155 schema = self.catalog_schema 156 if schema is None: 157 # On read requests we should always have the catalog available 158 raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name) 159 # The stream only supports a single file type, so we can use the same parser for all files 160 parser = self.get_parser() 161 for file in stream_slice["files"]: 162 # only serialize the datetime once 163 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 164 n_skipped = line_no = 0 165 166 try: 167 if self.use_file_transfer: 168 for file_record_data, file_reference in self._file_transfer.upload( 169 file=file, stream_reader=self.stream_reader, logger=self.logger 170 ): 171 yield stream_data_to_airbyte_message( 172 self.name, 173 file_record_data.dict(exclude_none=True), 174 file_reference=file_reference, 175 ) 176 else: 177 for record in parser.parse_records( 178 self.config, file, self.stream_reader, self.logger, schema 179 ): 180 line_no += 1 181 if self.config.schemaless: 182 record = {"data": record} 183 elif not self.record_passes_validation_policy(record): 184 n_skipped += 1 185 continue 186 record = self.transform_record(record, file, file_datetime_string) 187 yield stream_data_to_airbyte_message(self.name, record) 188 self._cursor.add_file(file) 189 190 except StopSyncPerValidationPolicy: 191 yield AirbyteMessage( 192 type=MessageType.LOG, 193 log=AirbyteLogMessage( 194 level=Level.WARN, 195 message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}", 196 ), 197 ) 198 break 199 200 except RecordParseError: 201 # Increment line_no because the exception was raised before we could increment it 202 line_no += 1 203 self.errors_collector.collect( 204 AirbyteMessage( 205 type=MessageType.LOG, 206 log=AirbyteLogMessage( 207 level=Level.ERROR, 208 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 209 stack_trace=traceback.format_exc(), 210 ), 211 ), 212 ) 213 214 except AirbyteTracedException as exc: 215 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 216 raise exc 217 218 except Exception: 219 yield AirbyteMessage( 220 type=MessageType.LOG, 221 log=AirbyteLogMessage( 222 level=Level.ERROR, 223 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 224 stack_trace=traceback.format_exc(), 225 ), 226 ) 227 228 finally: 229 if n_skipped: 230 yield AirbyteMessage( 231 type=MessageType.LOG, 232 log=AirbyteLogMessage( 233 level=Level.WARN, 234 message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}", 235 ), 236 ) 237 238 @property 239 def cursor_field(self) -> Union[str, List[str]]: 240 """ 241 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 242 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 243 """ 244 return self.ab_last_mod_col 245 246 @cache 247 def get_json_schema(self) -> JsonSchema: # type: ignore 248 if self.use_file_transfer: 249 return file_transfer_schema 250 extra_fields = { 251 self.ab_last_mod_col: {"type": "string"}, 252 self.ab_file_name_col: {"type": "string"}, 253 } 254 try: 255 schema = self._get_raw_json_schema() 256 except InvalidSchemaError as config_exception: 257 raise AirbyteTracedException( 258 internal_message="Please check the logged errors for more information.", 259 message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, 260 exception=AirbyteTracedException(exception=config_exception), 261 failure_type=FailureType.config_error, 262 ) 263 except EmptyFileSchemaInferenceError as exc: 264 self._raise_schema_inference_error(exc) 265 except AirbyteTracedException as ate: 266 raise ate 267 except Exception as exc: 268 self._raise_schema_inference_error(exc) 269 else: 270 return {"type": "object", "properties": {**extra_fields, **schema["properties"]}} 271 272 def _get_raw_json_schema(self) -> JsonSchema: 273 if self.config.input_schema: 274 return self.config.get_input_schema() # type: ignore 275 elif self.config.schemaless: 276 return schemaless_schema 277 else: 278 files = self.list_files() 279 first_n_files = len(files) 280 281 if self.config.recent_n_files_to_read_for_schema_discovery: 282 self.logger.info( 283 msg=( 284 f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema " 285 f"for stream {self.name} due to limitation in config." 286 ) 287 ) 288 first_n_files = self.config.recent_n_files_to_read_for_schema_discovery 289 290 if first_n_files == 0: 291 self.logger.warning( 292 msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream." 293 ) 294 return schemaless_schema 295 296 max_n_files_for_schema_inference = ( 297 self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser()) 298 ) 299 300 if first_n_files > max_n_files_for_schema_inference: 301 # Use the most recent files for schema inference, so we pick up schema changes during discovery. 302 self.logger.warning( 303 msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files." 304 ) 305 first_n_files = max_n_files_for_schema_inference 306 307 files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files] 308 309 inferred_schema = self.infer_schema(files) 310 311 if not inferred_schema: 312 raise InvalidSchemaError( 313 FileBasedSourceError.INVALID_SCHEMA_ERROR, 314 details=f"Empty schema. Please check that the files are valid for format {self.config.format}", 315 stream=self.name, 316 ) 317 318 schema = {"type": "object", "properties": inferred_schema} 319 320 return schema 321 322 def get_files(self) -> Iterable[RemoteFile]: 323 """ 324 Return all files that belong to the stream as defined by the stream's globs. 325 """ 326 return self.stream_reader.get_matching_files( 327 self.config.globs or [], self.config.legacy_prefix, self.logger 328 ) 329 330 def as_airbyte_stream(self) -> AirbyteStream: 331 file_stream = super().as_airbyte_stream() 332 file_stream.is_file_based = self.use_file_transfer 333 return file_stream 334 335 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 336 loop = asyncio.get_event_loop() 337 schema = loop.run_until_complete(self._infer_schema(files)) 338 # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference 339 return self._fill_nulls(deepcopy(schema)) 340 341 @staticmethod 342 def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]: 343 if isinstance(schema, dict): 344 for k, v in schema.items(): 345 if k == "type": 346 if isinstance(v, list): 347 if "null" not in v: 348 schema[k] = ["null"] + v 349 elif v != "null": 350 if isinstance(v, (str, list)): 351 schema[k] = ["null", v] 352 else: 353 DefaultFileBasedStream._fill_nulls(v) 354 else: 355 DefaultFileBasedStream._fill_nulls(v) 356 elif isinstance(schema, list): 357 for item in schema: 358 DefaultFileBasedStream._fill_nulls(item) 359 return schema 360 361 async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 362 """ 363 Infer the schema for a stream. 364 365 Each file type has a corresponding `infer_schema` handler. 366 Dispatch on file type. 367 """ 368 base_schema: SchemaType = {} 369 pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set() 370 371 n_started, n_files = 0, len(files) 372 files_iterator = iter(files) 373 while pending_tasks or n_started < n_files: 374 while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and ( 375 file := next(files_iterator, None) 376 ): 377 pending_tasks.add(asyncio.create_task(self._infer_file_schema(file))) 378 n_started += 1 379 # Return when the first task is completed so that we can enqueue a new task as soon as the 380 # number of concurrent tasks drops below the number allowed. 381 done, pending_tasks = await asyncio.wait( 382 pending_tasks, return_when=asyncio.FIRST_COMPLETED 383 ) 384 for task in done: 385 try: 386 base_schema = merge_schemas(base_schema, task.result()) 387 except AirbyteTracedException as ate: 388 raise ate 389 except Exception as exc: 390 self.logger.error( 391 f"An error occurred inferring the schema. \n {traceback.format_exc()}", 392 exc_info=exc, 393 ) 394 395 return base_schema 396 397 async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: # type: ignore 398 try: 399 return await self.get_parser().infer_schema( 400 self.config, file, self.stream_reader, self.logger 401 ) 402 except EmptyFileSchemaInferenceError as exc: 403 self._raise_schema_inference_error(exc, file) 404 except AirbyteTracedException as ate: 405 raise ate 406 except Exception as exc: 407 self._raise_schema_inference_error(exc, file) 408 409 def _raise_schema_inference_error( 410 self, exc: Exception, file: Optional[RemoteFile] = None 411 ) -> NoReturn: 412 raise SchemaInferenceError( 413 FileBasedSourceError.SCHEMA_INFERENCE_ERROR, 414 file=file.uri if file else None, 415 format=str(self.config.format) if self.config.format else None, 416 stream=self.name, 417 ) from exc
The default file-based stream.
75 def __init__(self, **kwargs: Any): 76 if self.FILE_TRANSFER_KW in kwargs: 77 self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) 78 if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs: 79 self.preserve_directory_structure = kwargs.pop( 80 self.PRESERVE_DIRECTORY_STRUCTURE_KW, True 81 ) 82 super().__init__(**kwargs)
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
105 @property 106 def primary_key(self) -> PrimaryKeyType: 107 return self.config.primary_key or self.get_parser().get_parser_defined_primary_key( 108 self.config 109 )
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
123 def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: 124 # Sort files by last_modified, uri and return them grouped by last_modified 125 all_files = self.list_files() 126 files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) 127 sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) 128 slices = [ 129 {self.FILES_KEY: list(group[1])} 130 for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) 131 ] 132 if slices and not self.preserve_directory_structure: 133 duplicated_files_names = self._duplicated_files_names(slices) 134 if duplicated_files_names: 135 raise DuplicatedFilesError( 136 stream=self.name, duplicated_files_names=duplicated_files_names 137 ) 138 return slices
Return a list of slices that will be used to read files in the current sync.
Returns
The slices to use for the current sync.
140 def transform_record( 141 self, record: dict[str, Any], file: RemoteFile, last_updated: str 142 ) -> dict[str, Any]: 143 # adds _ab_source_file_last_modified and _ab_source_file_url to the record 144 record[self.ab_last_mod_col] = last_updated 145 record[self.ab_file_name_col] = file.uri 146 return record
148 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 149 """ 150 Yield all records from all remote files in `list_files_for_this_sync`. 151 152 If an error is encountered reading records from a file, log a message and do not attempt 153 to sync the rest of the file. 154 """ 155 schema = self.catalog_schema 156 if schema is None: 157 # On read requests we should always have the catalog available 158 raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name) 159 # The stream only supports a single file type, so we can use the same parser for all files 160 parser = self.get_parser() 161 for file in stream_slice["files"]: 162 # only serialize the datetime once 163 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 164 n_skipped = line_no = 0 165 166 try: 167 if self.use_file_transfer: 168 for file_record_data, file_reference in self._file_transfer.upload( 169 file=file, stream_reader=self.stream_reader, logger=self.logger 170 ): 171 yield stream_data_to_airbyte_message( 172 self.name, 173 file_record_data.dict(exclude_none=True), 174 file_reference=file_reference, 175 ) 176 else: 177 for record in parser.parse_records( 178 self.config, file, self.stream_reader, self.logger, schema 179 ): 180 line_no += 1 181 if self.config.schemaless: 182 record = {"data": record} 183 elif not self.record_passes_validation_policy(record): 184 n_skipped += 1 185 continue 186 record = self.transform_record(record, file, file_datetime_string) 187 yield stream_data_to_airbyte_message(self.name, record) 188 self._cursor.add_file(file) 189 190 except StopSyncPerValidationPolicy: 191 yield AirbyteMessage( 192 type=MessageType.LOG, 193 log=AirbyteLogMessage( 194 level=Level.WARN, 195 message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}", 196 ), 197 ) 198 break 199 200 except RecordParseError: 201 # Increment line_no because the exception was raised before we could increment it 202 line_no += 1 203 self.errors_collector.collect( 204 AirbyteMessage( 205 type=MessageType.LOG, 206 log=AirbyteLogMessage( 207 level=Level.ERROR, 208 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 209 stack_trace=traceback.format_exc(), 210 ), 211 ), 212 ) 213 214 except AirbyteTracedException as exc: 215 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 216 raise exc 217 218 except Exception: 219 yield AirbyteMessage( 220 type=MessageType.LOG, 221 log=AirbyteLogMessage( 222 level=Level.ERROR, 223 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 224 stack_trace=traceback.format_exc(), 225 ), 226 ) 227 228 finally: 229 if n_skipped: 230 yield AirbyteMessage( 231 type=MessageType.LOG, 232 log=AirbyteLogMessage( 233 level=Level.WARN, 234 message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}", 235 ), 236 )
Yield all records from all remote files in list_files_for_this_sync.
If an error is encountered reading records from a file, log a message and do not attempt to sync the rest of the file.
238 @property 239 def cursor_field(self) -> Union[str, List[str]]: 240 """ 241 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 242 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 243 """ 244 return self.ab_last_mod_col
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
246 @cache 247 def get_json_schema(self) -> JsonSchema: # type: ignore 248 if self.use_file_transfer: 249 return file_transfer_schema 250 extra_fields = { 251 self.ab_last_mod_col: {"type": "string"}, 252 self.ab_file_name_col: {"type": "string"}, 253 } 254 try: 255 schema = self._get_raw_json_schema() 256 except InvalidSchemaError as config_exception: 257 raise AirbyteTracedException( 258 internal_message="Please check the logged errors for more information.", 259 message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, 260 exception=AirbyteTracedException(exception=config_exception), 261 failure_type=FailureType.config_error, 262 ) 263 except EmptyFileSchemaInferenceError as exc: 264 self._raise_schema_inference_error(exc) 265 except AirbyteTracedException as ate: 266 raise ate 267 except Exception as exc: 268 self._raise_schema_inference_error(exc) 269 else: 270 return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
Return the JSON Schema for a stream.
322 def get_files(self) -> Iterable[RemoteFile]: 323 """ 324 Return all files that belong to the stream as defined by the stream's globs. 325 """ 326 return self.stream_reader.get_matching_files( 327 self.config.globs or [], self.config.legacy_prefix, self.logger 328 )
Return all files that belong to the stream as defined by the stream's globs.
335 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 336 loop = asyncio.get_event_loop() 337 schema = loop.run_until_complete(self._infer_schema(files)) 338 # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference 339 return self._fill_nulls(deepcopy(schema))
Infer the schema for files in the stream.
Inherited Members
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- list_files
- read_records
- stream_slices
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- supports_incremental
- is_resumable
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
19class FileIdentitiesStream(IdentitiesStream): 20 """ 21 The identities stream. A full refresh stream to sync identities from a certain domain. 22 The stream reader manage the logic to get such data, which is implemented on connector side. 23 """ 24 25 is_resumable = False 26 27 def __init__( 28 self, 29 catalog_schema: Optional[Mapping[str, Any]], 30 stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, 31 discovery_policy: AbstractDiscoveryPolicy, 32 errors_collector: FileBasedErrorsCollector, 33 ) -> None: 34 super().__init__() 35 self.catalog_schema = catalog_schema 36 self.stream_permissions_reader = stream_permissions_reader 37 self._discovery_policy = discovery_policy 38 self.errors_collector = errors_collector 39 self._cursor: MutableMapping[str, Any] = {} 40 41 @property 42 def primary_key(self) -> PrimaryKeyType: 43 return None 44 45 def load_identity_groups(self) -> Iterable[Dict[str, Any]]: 46 return self.stream_permissions_reader.load_identity_groups(logger=self.logger) 47 48 @cache 49 def get_json_schema(self) -> JsonSchema: 50 return self.stream_permissions_reader.identities_schema
The identities stream. A full refresh stream to sync identities from a certain domain. The stream reader manage the logic to get such data, which is implemented on connector side.
27 def __init__( 28 self, 29 catalog_schema: Optional[Mapping[str, Any]], 30 stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, 31 discovery_policy: AbstractDiscoveryPolicy, 32 errors_collector: FileBasedErrorsCollector, 33 ) -> None: 34 super().__init__() 35 self.catalog_schema = catalog_schema 36 self.stream_permissions_reader = stream_permissions_reader 37 self._discovery_policy = discovery_policy 38 self.errors_collector = errors_collector 39 self._cursor: MutableMapping[str, Any] = {}
Returns
True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
48 @cache 49 def get_json_schema(self) -> JsonSchema: 50 return self.stream_permissions_reader.identities_schema
Returns
A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.
Inherited Members
- airbyte_cdk.sources.streams.permissions.identities_stream.IdentitiesStream
- IDENTITIES_STREAM_NAME
- state
- read_records
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- stream_slices
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
20class PermissionsFileBasedStream(DefaultFileBasedStream): 21 """ 22 A specialized stream for handling file-based ACL permissions. 23 24 This stream works with the stream_reader to: 25 1. Fetch ACL permissions for each file in the source 26 2. Transform permissions into a standardized format 27 3. Generate records containing permission information 28 29 The stream_reader is responsible for the actual implementation of permission retrieval 30 and schema definition, while this class handles the streaming interface. 31 """ 32 33 def __init__( 34 self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any 35 ): 36 super().__init__(**kwargs) 37 self.stream_permissions_reader = stream_permissions_reader 38 39 def _filter_schema_invalid_properties( 40 self, configured_catalog_json_schema: Dict[str, Any] 41 ) -> Dict[str, Any]: 42 return self.stream_permissions_reader.file_permissions_schema 43 44 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 45 """ 46 Yield permissions records from all remote files 47 """ 48 49 for file in stream_slice["files"]: 50 no_permissions = False 51 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 52 try: 53 permissions_record = self.stream_permissions_reader.get_file_acl_permissions( 54 file, logger=self.logger 55 ) 56 if not permissions_record: 57 no_permissions = True 58 self.logger.warning( 59 f"Unable to fetch permissions. stream={self.name} file={file.uri}" 60 ) 61 continue 62 permissions_record = self.transform_record( 63 permissions_record, file, file_datetime_string 64 ) 65 yield stream_data_to_airbyte_message(self.name, permissions_record) 66 except Exception as e: 67 self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}") 68 yield AirbyteMessage( 69 type=MessageType.LOG, 70 log=AirbyteLogMessage( 71 level=Level.ERROR, 72 message=f"Error retrieving files permissions: stream={self.name} file={file.uri}", 73 stack_trace=traceback.format_exc(), 74 ), 75 ) 76 finally: 77 if no_permissions: 78 yield AirbyteMessage( 79 type=MessageType.LOG, 80 log=AirbyteLogMessage( 81 level=Level.WARN, 82 message=f"Unable to fetch permissions. stream={self.name} file={file.uri}", 83 ), 84 ) 85 86 def _get_raw_json_schema(self) -> JsonSchema: 87 """ 88 Retrieve the raw JSON schema for file permissions from the stream reader. 89 90 Returns: 91 The file permissions schema that defines the structure of permission records 92 """ 93 return self.stream_permissions_reader.file_permissions_schema
A specialized stream for handling file-based ACL permissions.
This stream works with the stream_reader to:
- Fetch ACL permissions for each file in the source
- Transform permissions into a standardized format
- Generate records containing permission information
The stream_reader is responsible for the actual implementation of permission retrieval and schema definition, while this class handles the streaming interface.
44 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 45 """ 46 Yield permissions records from all remote files 47 """ 48 49 for file in stream_slice["files"]: 50 no_permissions = False 51 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 52 try: 53 permissions_record = self.stream_permissions_reader.get_file_acl_permissions( 54 file, logger=self.logger 55 ) 56 if not permissions_record: 57 no_permissions = True 58 self.logger.warning( 59 f"Unable to fetch permissions. stream={self.name} file={file.uri}" 60 ) 61 continue 62 permissions_record = self.transform_record( 63 permissions_record, file, file_datetime_string 64 ) 65 yield stream_data_to_airbyte_message(self.name, permissions_record) 66 except Exception as e: 67 self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}") 68 yield AirbyteMessage( 69 type=MessageType.LOG, 70 log=AirbyteLogMessage( 71 level=Level.ERROR, 72 message=f"Error retrieving files permissions: stream={self.name} file={file.uri}", 73 stack_trace=traceback.format_exc(), 74 ), 75 ) 76 finally: 77 if no_permissions: 78 yield AirbyteMessage( 79 type=MessageType.LOG, 80 log=AirbyteLogMessage( 81 level=Level.WARN, 82 message=f"Unable to fetch permissions. stream={self.name} file={file.uri}", 83 ), 84 )
Yield permissions records from all remote files
Inherited Members
- DefaultFileBasedStream
- FILE_TRANSFER_KW
- PRESERVE_DIRECTORY_STRUCTURE_KW
- FILES_KEY
- DATE_TIME_FORMAT
- ab_last_mod_col
- ab_file_name_col
- modified
- source_file_url
- airbyte_columns
- use_file_transfer
- preserve_directory_structure
- state
- cursor
- primary_key
- compute_slices
- transform_record
- cursor_field
- get_json_schema
- get_files
- as_airbyte_stream
- infer_schema
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- list_files
- read_records
- stream_slices
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- supports_incremental
- is_resumable
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema