airbyte_cdk.sources.file_based.stream
1from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream 2from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream 3from airbyte_cdk.sources.file_based.stream.identities_stream import FileIdentitiesStream 4from airbyte_cdk.sources.file_based.stream.permissions_file_based_stream import ( 5 PermissionsFileBasedStream, 6) 7 8__all__ = [ 9 "AbstractFileBasedStream", 10 "DefaultFileBasedStream", 11 "FileIdentitiesStream", 12 "PermissionsFileBasedStream", 13]
38class AbstractFileBasedStream(Stream): 39 """ 40 A file-based stream in an Airbyte source. 41 42 In addition to the base Stream attributes, a file-based stream has 43 - A config object (derived from the corresponding stream section in source config). 44 This contains the globs defining the stream's files. 45 - A StreamReader, which knows how to list and open files in the stream. 46 - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open 47 files in the stream. 48 - A DiscoveryPolicy that controls the number of concurrent requests sent to the source 49 during discover, and the number of files used for schema discovery. 50 - A dictionary of FileType:Parser that holds all the file types that can be handled 51 by the stream. 52 """ 53 54 def __init__( 55 self, 56 config: FileBasedStreamConfig, 57 catalog_schema: Optional[Mapping[str, Any]], 58 stream_reader: AbstractFileBasedStreamReader, 59 availability_strategy: AbstractFileBasedAvailabilityStrategy, 60 discovery_policy: AbstractDiscoveryPolicy, 61 parsers: Dict[Type[Any], FileTypeParser], 62 validation_policy: AbstractSchemaValidationPolicy, 63 errors_collector: FileBasedErrorsCollector, 64 cursor: AbstractFileBasedCursor, 65 ): 66 super().__init__() 67 self.config = config 68 self.catalog_schema = catalog_schema 69 self.validation_policy = validation_policy 70 self.stream_reader = stream_reader 71 self._discovery_policy = discovery_policy 72 self._availability_strategy = availability_strategy 73 self._parsers = parsers 74 self.errors_collector = errors_collector 75 self._cursor = cursor 76 77 @property 78 @abstractmethod 79 def primary_key(self) -> PrimaryKeyType: ... 80 81 @cache 82 def list_files(self) -> List[RemoteFile]: 83 """ 84 List all files that belong to the stream. 85 86 The output of this method is cached so we don't need to list the files more than once. 87 This means we won't pick up changes to the files during a sync. This method uses the 88 get_files method which is implemented by the concrete stream class. 89 """ 90 return list(self.get_files()) 91 92 @abstractmethod 93 def get_files(self) -> Iterable[RemoteFile]: 94 """ 95 List all files that belong to the stream as defined by the stream's globs. 96 """ 97 ... 98 99 def read_records( 100 self, 101 sync_mode: SyncMode, 102 cursor_field: Optional[List[str]] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 stream_state: Optional[Mapping[str, Any]] = None, 105 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 106 """ 107 Yield all records from all remote files in `list_files_for_this_sync`. 108 This method acts as an adapter between the generic Stream interface and the file-based's 109 stream since file-based streams manage their own states. 110 """ 111 if stream_slice is None: 112 raise ValueError("stream_slice must be set") 113 return self.read_records_from_slice(stream_slice) 114 115 @abstractmethod 116 def read_records_from_slice( 117 self, stream_slice: StreamSlice 118 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 119 """ 120 Yield all records from all remote files in `list_files_for_this_sync`. 121 """ 122 ... 123 124 def stream_slices( 125 self, 126 *, 127 sync_mode: SyncMode, 128 cursor_field: Optional[List[str]] = None, 129 stream_state: Optional[Mapping[str, Any]] = None, 130 ) -> Iterable[Optional[Mapping[str, Any]]]: 131 """ 132 This method acts as an adapter between the generic Stream interface and the file-based's 133 stream since file-based streams manage their own states. 134 """ 135 return self.compute_slices() 136 137 @abstractmethod 138 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 139 """ 140 Return a list of slices that will be used to read files in the current sync. 141 :return: The slices to use for the current sync. 142 """ 143 ... 144 145 @abstractmethod 146 @lru_cache(maxsize=None) 147 def get_json_schema(self) -> Mapping[str, Any]: 148 """ 149 Return the JSON Schema for a stream. 150 """ 151 ... 152 153 @abstractmethod 154 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 155 """ 156 Infer the schema for files in the stream. 157 """ 158 ... 159 160 def get_parser(self) -> FileTypeParser: 161 try: 162 return self._parsers[type(self.config.format)] 163 except KeyError: 164 raise UndefinedParserError( 165 FileBasedSourceError.UNDEFINED_PARSER, 166 stream=self.name, 167 format=type(self.config.format), 168 ) 169 170 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 171 if self.validation_policy: 172 return self.validation_policy.record_passes_validation_policy( 173 record=record, schema=self.catalog_schema 174 ) 175 else: 176 raise RecordParseError( 177 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 178 stream=self.name, 179 validation_policy=self.config.validation_policy, 180 ) 181 182 @cached_property 183 @deprecated("Deprecated as of CDK version 3.7.0.") 184 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 185 return self._availability_strategy 186 187 @property 188 def name(self) -> str: 189 return self.config.name 190 191 def get_cursor(self) -> Optional[Cursor]: 192 """ 193 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 194 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 195 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 196 then this override can be removed. 197 """ 198 return None
A file-based stream in an Airbyte source.
In addition to the base Stream attributes, a file-based stream has
- A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
- A StreamReader, which knows how to list and open files in the stream.
- A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
- A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
- A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
81 @cache 82 def list_files(self) -> List[RemoteFile]: 83 """ 84 List all files that belong to the stream. 85 86 The output of this method is cached so we don't need to list the files more than once. 87 This means we won't pick up changes to the files during a sync. This method uses the 88 get_files method which is implemented by the concrete stream class. 89 """ 90 return list(self.get_files())
List all files that belong to the stream.
The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.
92 @abstractmethod 93 def get_files(self) -> Iterable[RemoteFile]: 94 """ 95 List all files that belong to the stream as defined by the stream's globs. 96 """ 97 ...
List all files that belong to the stream as defined by the stream's globs.
99 def read_records( 100 self, 101 sync_mode: SyncMode, 102 cursor_field: Optional[List[str]] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 stream_state: Optional[Mapping[str, Any]] = None, 105 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 106 """ 107 Yield all records from all remote files in `list_files_for_this_sync`. 108 This method acts as an adapter between the generic Stream interface and the file-based's 109 stream since file-based streams manage their own states. 110 """ 111 if stream_slice is None: 112 raise ValueError("stream_slice must be set") 113 return self.read_records_from_slice(stream_slice)
Yield all records from all remote files in list_files_for_this_sync
.
This method acts as an adapter between the generic Stream interface and the file-based's
stream since file-based streams manage their own states.
115 @abstractmethod 116 def read_records_from_slice( 117 self, stream_slice: StreamSlice 118 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 119 """ 120 Yield all records from all remote files in `list_files_for_this_sync`. 121 """ 122 ...
Yield all records from all remote files in list_files_for_this_sync
.
124 def stream_slices( 125 self, 126 *, 127 sync_mode: SyncMode, 128 cursor_field: Optional[List[str]] = None, 129 stream_state: Optional[Mapping[str, Any]] = None, 130 ) -> Iterable[Optional[Mapping[str, Any]]]: 131 """ 132 This method acts as an adapter between the generic Stream interface and the file-based's 133 stream since file-based streams manage their own states. 134 """ 135 return self.compute_slices()
This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.
137 @abstractmethod 138 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 139 """ 140 Return a list of slices that will be used to read files in the current sync. 141 :return: The slices to use for the current sync. 142 """ 143 ...
Return a list of slices that will be used to read files in the current sync.
Returns
The slices to use for the current sync.
145 @abstractmethod 146 @lru_cache(maxsize=None) 147 def get_json_schema(self) -> Mapping[str, Any]: 148 """ 149 Return the JSON Schema for a stream. 150 """ 151 ...
Return the JSON Schema for a stream.
153 @abstractmethod 154 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 155 """ 156 Infer the schema for files in the stream. 157 """ 158 ...
Infer the schema for files in the stream.
170 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 171 if self.validation_policy: 172 return self.validation_policy.record_passes_validation_policy( 173 record=record, schema=self.catalog_schema 174 ) 175 else: 176 raise RecordParseError( 177 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 178 stream=self.name, 179 validation_policy=self.config.validation_policy, 180 )
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
191 def get_cursor(self) -> Optional[Cursor]: 192 """ 193 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 194 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 195 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 196 then this override can be removed. 197 """ 198 return None
This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- is_resumable
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
44class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): 45 """ 46 The default file-based stream. 47 """ 48 49 FILE_TRANSFER_KW = "use_file_transfer" 50 PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure" 51 FILES_KEY = "files" 52 DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 53 ab_last_mod_col = "_ab_source_file_last_modified" 54 ab_file_name_col = "_ab_source_file_url" 55 modified = "modified" 56 source_file_url = "source_file_url" 57 airbyte_columns = [ab_last_mod_col, ab_file_name_col] 58 use_file_transfer = False 59 preserve_directory_structure = True 60 _file_transfer = FileTransfer() 61 62 def __init__(self, **kwargs: Any): 63 if self.FILE_TRANSFER_KW in kwargs: 64 self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) 65 if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs: 66 self.preserve_directory_structure = kwargs.pop( 67 self.PRESERVE_DIRECTORY_STRUCTURE_KW, True 68 ) 69 super().__init__(**kwargs) 70 71 @property 72 def state(self) -> MutableMapping[str, Any]: 73 return self._cursor.get_state() 74 75 @state.setter 76 def state(self, value: MutableMapping[str, Any]) -> None: 77 """State setter, accept state serialized by state getter.""" 78 self._cursor.set_initial_state(value) 79 80 @property # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors 81 def cursor(self) -> Optional[AbstractFileBasedCursor]: 82 return self._cursor 83 84 @cursor.setter 85 def cursor(self, value: AbstractFileBasedCursor) -> None: 86 if self._cursor is not None: 87 raise RuntimeError( 88 f"Cursor for stream {self.name} is already set. This is unexpected. Please contact Support." 89 ) 90 self._cursor = value 91 92 @property 93 def primary_key(self) -> PrimaryKeyType: 94 return self.config.primary_key or self.get_parser().get_parser_defined_primary_key( 95 self.config 96 ) 97 98 def _duplicated_files_names( 99 self, slices: List[dict[str, List[RemoteFile]]] 100 ) -> List[dict[str, List[str]]]: 101 seen_file_names: Dict[str, List[str]] = defaultdict(list) 102 for file_slice in slices: 103 for file_found in file_slice[self.FILES_KEY]: 104 file_name = path.basename(file_found.uri) 105 seen_file_names[file_name].append(file_found.uri) 106 return [ 107 {file_name: paths} for file_name, paths in seen_file_names.items() if len(paths) > 1 108 ] 109 110 def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: 111 # Sort files by last_modified, uri and return them grouped by last_modified 112 all_files = self.list_files() 113 files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) 114 sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) 115 slices = [ 116 {self.FILES_KEY: list(group[1])} 117 for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) 118 ] 119 if slices and not self.preserve_directory_structure: 120 duplicated_files_names = self._duplicated_files_names(slices) 121 if duplicated_files_names: 122 raise DuplicatedFilesError( 123 stream=self.name, duplicated_files_names=duplicated_files_names 124 ) 125 return slices 126 127 def transform_record( 128 self, record: dict[str, Any], file: RemoteFile, last_updated: str 129 ) -> dict[str, Any]: 130 # adds _ab_source_file_last_modified and _ab_source_file_url to the record 131 record[self.ab_last_mod_col] = last_updated 132 record[self.ab_file_name_col] = file.uri 133 return record 134 135 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 136 """ 137 Yield all records from all remote files in `list_files_for_this_sync`. 138 139 If an error is encountered reading records from a file, log a message and do not attempt 140 to sync the rest of the file. 141 """ 142 schema = self.catalog_schema 143 if schema is None: 144 # On read requests we should always have the catalog available 145 raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name) 146 # The stream only supports a single file type, so we can use the same parser for all files 147 parser = self.get_parser() 148 for file in stream_slice["files"]: 149 # only serialize the datetime once 150 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 151 n_skipped = line_no = 0 152 153 try: 154 if self.use_file_transfer: 155 for file_record_data, file_reference in self._file_transfer.upload( 156 file=file, stream_reader=self.stream_reader, logger=self.logger 157 ): 158 yield stream_data_to_airbyte_message( 159 self.name, 160 file_record_data.dict(exclude_none=True), 161 file_reference=file_reference, 162 ) 163 else: 164 for record in parser.parse_records( 165 self.config, file, self.stream_reader, self.logger, schema 166 ): 167 line_no += 1 168 if self.config.schemaless: 169 record = {"data": record} 170 elif not self.record_passes_validation_policy(record): 171 n_skipped += 1 172 continue 173 record = self.transform_record(record, file, file_datetime_string) 174 yield stream_data_to_airbyte_message(self.name, record) 175 self._cursor.add_file(file) 176 177 except StopSyncPerValidationPolicy: 178 yield AirbyteMessage( 179 type=MessageType.LOG, 180 log=AirbyteLogMessage( 181 level=Level.WARN, 182 message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}", 183 ), 184 ) 185 break 186 187 except RecordParseError: 188 # Increment line_no because the exception was raised before we could increment it 189 line_no += 1 190 self.errors_collector.collect( 191 AirbyteMessage( 192 type=MessageType.LOG, 193 log=AirbyteLogMessage( 194 level=Level.ERROR, 195 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 196 stack_trace=traceback.format_exc(), 197 ), 198 ), 199 ) 200 201 except AirbyteTracedException as exc: 202 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 203 raise exc 204 205 except Exception: 206 yield AirbyteMessage( 207 type=MessageType.LOG, 208 log=AirbyteLogMessage( 209 level=Level.ERROR, 210 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 211 stack_trace=traceback.format_exc(), 212 ), 213 ) 214 215 finally: 216 if n_skipped: 217 yield AirbyteMessage( 218 type=MessageType.LOG, 219 log=AirbyteLogMessage( 220 level=Level.WARN, 221 message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}", 222 ), 223 ) 224 225 @property 226 def cursor_field(self) -> Union[str, List[str]]: 227 """ 228 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 229 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 230 """ 231 return self.ab_last_mod_col 232 233 @cache 234 def get_json_schema(self) -> JsonSchema: 235 if self.use_file_transfer: 236 return file_transfer_schema 237 extra_fields = { 238 self.ab_last_mod_col: {"type": "string"}, 239 self.ab_file_name_col: {"type": "string"}, 240 } 241 try: 242 schema = self._get_raw_json_schema() 243 except InvalidSchemaError as config_exception: 244 raise AirbyteTracedException( 245 internal_message="Please check the logged errors for more information.", 246 message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, 247 exception=AirbyteTracedException(exception=config_exception), 248 failure_type=FailureType.config_error, 249 ) 250 except AirbyteTracedException as ate: 251 raise ate 252 except Exception as exc: 253 raise SchemaInferenceError( 254 FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name 255 ) from exc 256 else: 257 return {"type": "object", "properties": {**extra_fields, **schema["properties"]}} 258 259 def _get_raw_json_schema(self) -> JsonSchema: 260 if self.config.input_schema: 261 return self.config.get_input_schema() # type: ignore 262 elif self.config.schemaless: 263 return schemaless_schema 264 else: 265 files = self.list_files() 266 first_n_files = len(files) 267 268 if self.config.recent_n_files_to_read_for_schema_discovery: 269 self.logger.info( 270 msg=( 271 f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema " 272 f"for stream {self.name} due to limitation in config." 273 ) 274 ) 275 first_n_files = self.config.recent_n_files_to_read_for_schema_discovery 276 277 if first_n_files == 0: 278 self.logger.warning( 279 msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream." 280 ) 281 return schemaless_schema 282 283 max_n_files_for_schema_inference = ( 284 self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser()) 285 ) 286 287 if first_n_files > max_n_files_for_schema_inference: 288 # Use the most recent files for schema inference, so we pick up schema changes during discovery. 289 self.logger.warning( 290 msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files." 291 ) 292 first_n_files = max_n_files_for_schema_inference 293 294 files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files] 295 296 inferred_schema = self.infer_schema(files) 297 298 if not inferred_schema: 299 raise InvalidSchemaError( 300 FileBasedSourceError.INVALID_SCHEMA_ERROR, 301 details=f"Empty schema. Please check that the files are valid for format {self.config.format}", 302 stream=self.name, 303 ) 304 305 schema = {"type": "object", "properties": inferred_schema} 306 307 return schema 308 309 def get_files(self) -> Iterable[RemoteFile]: 310 """ 311 Return all files that belong to the stream as defined by the stream's globs. 312 """ 313 return self.stream_reader.get_matching_files( 314 self.config.globs or [], self.config.legacy_prefix, self.logger 315 ) 316 317 def as_airbyte_stream(self) -> AirbyteStream: 318 file_stream = super().as_airbyte_stream() 319 file_stream.is_file_based = self.use_file_transfer 320 return file_stream 321 322 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 323 loop = asyncio.get_event_loop() 324 schema = loop.run_until_complete(self._infer_schema(files)) 325 # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference 326 return self._fill_nulls(deepcopy(schema)) 327 328 @staticmethod 329 def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]: 330 if isinstance(schema, dict): 331 for k, v in schema.items(): 332 if k == "type": 333 if isinstance(v, list): 334 if "null" not in v: 335 schema[k] = ["null"] + v 336 elif v != "null": 337 if isinstance(v, (str, list)): 338 schema[k] = ["null", v] 339 else: 340 DefaultFileBasedStream._fill_nulls(v) 341 else: 342 DefaultFileBasedStream._fill_nulls(v) 343 elif isinstance(schema, list): 344 for item in schema: 345 DefaultFileBasedStream._fill_nulls(item) 346 return schema 347 348 async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 349 """ 350 Infer the schema for a stream. 351 352 Each file type has a corresponding `infer_schema` handler. 353 Dispatch on file type. 354 """ 355 base_schema: SchemaType = {} 356 pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set() 357 358 n_started, n_files = 0, len(files) 359 files_iterator = iter(files) 360 while pending_tasks or n_started < n_files: 361 while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and ( 362 file := next(files_iterator, None) 363 ): 364 pending_tasks.add(asyncio.create_task(self._infer_file_schema(file))) 365 n_started += 1 366 # Return when the first task is completed so that we can enqueue a new task as soon as the 367 # number of concurrent tasks drops below the number allowed. 368 done, pending_tasks = await asyncio.wait( 369 pending_tasks, return_when=asyncio.FIRST_COMPLETED 370 ) 371 for task in done: 372 try: 373 base_schema = merge_schemas(base_schema, task.result()) 374 except AirbyteTracedException as ate: 375 raise ate 376 except Exception as exc: 377 self.logger.error( 378 f"An error occurred inferring the schema. \n {traceback.format_exc()}", 379 exc_info=exc, 380 ) 381 382 return base_schema 383 384 async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: 385 try: 386 return await self.get_parser().infer_schema( 387 self.config, file, self.stream_reader, self.logger 388 ) 389 except AirbyteTracedException as ate: 390 raise ate 391 except Exception as exc: 392 raise SchemaInferenceError( 393 FileBasedSourceError.SCHEMA_INFERENCE_ERROR, 394 file=file.uri, 395 format=str(self.config.format), 396 stream=self.name, 397 ) from exc
The default file-based stream.
62 def __init__(self, **kwargs: Any): 63 if self.FILE_TRANSFER_KW in kwargs: 64 self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) 65 if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs: 66 self.preserve_directory_structure = kwargs.pop( 67 self.PRESERVE_DIRECTORY_STRUCTURE_KW, True 68 ) 69 super().__init__(**kwargs)
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
92 @property 93 def primary_key(self) -> PrimaryKeyType: 94 return self.config.primary_key or self.get_parser().get_parser_defined_primary_key( 95 self.config 96 )
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
110 def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: 111 # Sort files by last_modified, uri and return them grouped by last_modified 112 all_files = self.list_files() 113 files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) 114 sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) 115 slices = [ 116 {self.FILES_KEY: list(group[1])} 117 for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) 118 ] 119 if slices and not self.preserve_directory_structure: 120 duplicated_files_names = self._duplicated_files_names(slices) 121 if duplicated_files_names: 122 raise DuplicatedFilesError( 123 stream=self.name, duplicated_files_names=duplicated_files_names 124 ) 125 return slices
Return a list of slices that will be used to read files in the current sync.
Returns
The slices to use for the current sync.
127 def transform_record( 128 self, record: dict[str, Any], file: RemoteFile, last_updated: str 129 ) -> dict[str, Any]: 130 # adds _ab_source_file_last_modified and _ab_source_file_url to the record 131 record[self.ab_last_mod_col] = last_updated 132 record[self.ab_file_name_col] = file.uri 133 return record
135 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 136 """ 137 Yield all records from all remote files in `list_files_for_this_sync`. 138 139 If an error is encountered reading records from a file, log a message and do not attempt 140 to sync the rest of the file. 141 """ 142 schema = self.catalog_schema 143 if schema is None: 144 # On read requests we should always have the catalog available 145 raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name) 146 # The stream only supports a single file type, so we can use the same parser for all files 147 parser = self.get_parser() 148 for file in stream_slice["files"]: 149 # only serialize the datetime once 150 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 151 n_skipped = line_no = 0 152 153 try: 154 if self.use_file_transfer: 155 for file_record_data, file_reference in self._file_transfer.upload( 156 file=file, stream_reader=self.stream_reader, logger=self.logger 157 ): 158 yield stream_data_to_airbyte_message( 159 self.name, 160 file_record_data.dict(exclude_none=True), 161 file_reference=file_reference, 162 ) 163 else: 164 for record in parser.parse_records( 165 self.config, file, self.stream_reader, self.logger, schema 166 ): 167 line_no += 1 168 if self.config.schemaless: 169 record = {"data": record} 170 elif not self.record_passes_validation_policy(record): 171 n_skipped += 1 172 continue 173 record = self.transform_record(record, file, file_datetime_string) 174 yield stream_data_to_airbyte_message(self.name, record) 175 self._cursor.add_file(file) 176 177 except StopSyncPerValidationPolicy: 178 yield AirbyteMessage( 179 type=MessageType.LOG, 180 log=AirbyteLogMessage( 181 level=Level.WARN, 182 message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}", 183 ), 184 ) 185 break 186 187 except RecordParseError: 188 # Increment line_no because the exception was raised before we could increment it 189 line_no += 1 190 self.errors_collector.collect( 191 AirbyteMessage( 192 type=MessageType.LOG, 193 log=AirbyteLogMessage( 194 level=Level.ERROR, 195 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 196 stack_trace=traceback.format_exc(), 197 ), 198 ), 199 ) 200 201 except AirbyteTracedException as exc: 202 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 203 raise exc 204 205 except Exception: 206 yield AirbyteMessage( 207 type=MessageType.LOG, 208 log=AirbyteLogMessage( 209 level=Level.ERROR, 210 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 211 stack_trace=traceback.format_exc(), 212 ), 213 ) 214 215 finally: 216 if n_skipped: 217 yield AirbyteMessage( 218 type=MessageType.LOG, 219 log=AirbyteLogMessage( 220 level=Level.WARN, 221 message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}", 222 ), 223 )
Yield all records from all remote files in list_files_for_this_sync
.
If an error is encountered reading records from a file, log a message and do not attempt to sync the rest of the file.
225 @property 226 def cursor_field(self) -> Union[str, List[str]]: 227 """ 228 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 229 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 230 """ 231 return self.ab_last_mod_col
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
233 @cache 234 def get_json_schema(self) -> JsonSchema: 235 if self.use_file_transfer: 236 return file_transfer_schema 237 extra_fields = { 238 self.ab_last_mod_col: {"type": "string"}, 239 self.ab_file_name_col: {"type": "string"}, 240 } 241 try: 242 schema = self._get_raw_json_schema() 243 except InvalidSchemaError as config_exception: 244 raise AirbyteTracedException( 245 internal_message="Please check the logged errors for more information.", 246 message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, 247 exception=AirbyteTracedException(exception=config_exception), 248 failure_type=FailureType.config_error, 249 ) 250 except AirbyteTracedException as ate: 251 raise ate 252 except Exception as exc: 253 raise SchemaInferenceError( 254 FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name 255 ) from exc 256 else: 257 return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
Return the JSON Schema for a stream.
309 def get_files(self) -> Iterable[RemoteFile]: 310 """ 311 Return all files that belong to the stream as defined by the stream's globs. 312 """ 313 return self.stream_reader.get_matching_files( 314 self.config.globs or [], self.config.legacy_prefix, self.logger 315 )
Return all files that belong to the stream as defined by the stream's globs.
322 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 323 loop = asyncio.get_event_loop() 324 schema = loop.run_until_complete(self._infer_schema(files)) 325 # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference 326 return self._fill_nulls(deepcopy(schema))
Infer the schema for files in the stream.
Inherited Members
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- list_files
- read_records
- stream_slices
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- supports_incremental
- is_resumable
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
19class FileIdentitiesStream(IdentitiesStream): 20 """ 21 The identities stream. A full refresh stream to sync identities from a certain domain. 22 The stream reader manage the logic to get such data, which is implemented on connector side. 23 """ 24 25 is_resumable = False 26 27 def __init__( 28 self, 29 catalog_schema: Optional[Mapping[str, Any]], 30 stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, 31 discovery_policy: AbstractDiscoveryPolicy, 32 errors_collector: FileBasedErrorsCollector, 33 ) -> None: 34 super().__init__() 35 self.catalog_schema = catalog_schema 36 self.stream_permissions_reader = stream_permissions_reader 37 self._discovery_policy = discovery_policy 38 self.errors_collector = errors_collector 39 self._cursor: MutableMapping[str, Any] = {} 40 41 @property 42 def primary_key(self) -> PrimaryKeyType: 43 return None 44 45 def load_identity_groups(self) -> Iterable[Dict[str, Any]]: 46 return self.stream_permissions_reader.load_identity_groups(logger=self.logger) 47 48 @cache 49 def get_json_schema(self) -> JsonSchema: 50 return self.stream_permissions_reader.identities_schema
The identities stream. A full refresh stream to sync identities from a certain domain. The stream reader manage the logic to get such data, which is implemented on connector side.
27 def __init__( 28 self, 29 catalog_schema: Optional[Mapping[str, Any]], 30 stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, 31 discovery_policy: AbstractDiscoveryPolicy, 32 errors_collector: FileBasedErrorsCollector, 33 ) -> None: 34 super().__init__() 35 self.catalog_schema = catalog_schema 36 self.stream_permissions_reader = stream_permissions_reader 37 self._discovery_policy = discovery_policy 38 self.errors_collector = errors_collector 39 self._cursor: MutableMapping[str, Any] = {}
Returns
True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
48 @cache 49 def get_json_schema(self) -> JsonSchema: 50 return self.stream_permissions_reader.identities_schema
Returns
A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.
Inherited Members
- airbyte_cdk.sources.streams.permissions.identities_stream.IdentitiesStream
- IDENTITIES_STREAM_NAME
- state
- read_records
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- stream_slices
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
20class PermissionsFileBasedStream(DefaultFileBasedStream): 21 """ 22 A specialized stream for handling file-based ACL permissions. 23 24 This stream works with the stream_reader to: 25 1. Fetch ACL permissions for each file in the source 26 2. Transform permissions into a standardized format 27 3. Generate records containing permission information 28 29 The stream_reader is responsible for the actual implementation of permission retrieval 30 and schema definition, while this class handles the streaming interface. 31 """ 32 33 def __init__( 34 self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any 35 ): 36 super().__init__(**kwargs) 37 self.stream_permissions_reader = stream_permissions_reader 38 39 def _filter_schema_invalid_properties( 40 self, configured_catalog_json_schema: Dict[str, Any] 41 ) -> Dict[str, Any]: 42 return self.stream_permissions_reader.file_permissions_schema 43 44 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 45 """ 46 Yield permissions records from all remote files 47 """ 48 49 for file in stream_slice["files"]: 50 no_permissions = False 51 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 52 try: 53 permissions_record = self.stream_permissions_reader.get_file_acl_permissions( 54 file, logger=self.logger 55 ) 56 if not permissions_record: 57 no_permissions = True 58 self.logger.warning( 59 f"Unable to fetch permissions. stream={self.name} file={file.uri}" 60 ) 61 continue 62 permissions_record = self.transform_record( 63 permissions_record, file, file_datetime_string 64 ) 65 yield stream_data_to_airbyte_message(self.name, permissions_record) 66 except Exception as e: 67 self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}") 68 yield AirbyteMessage( 69 type=MessageType.LOG, 70 log=AirbyteLogMessage( 71 level=Level.ERROR, 72 message=f"Error retrieving files permissions: stream={self.name} file={file.uri}", 73 stack_trace=traceback.format_exc(), 74 ), 75 ) 76 finally: 77 if no_permissions: 78 yield AirbyteMessage( 79 type=MessageType.LOG, 80 log=AirbyteLogMessage( 81 level=Level.WARN, 82 message=f"Unable to fetch permissions. stream={self.name} file={file.uri}", 83 ), 84 ) 85 86 def _get_raw_json_schema(self) -> JsonSchema: 87 """ 88 Retrieve the raw JSON schema for file permissions from the stream reader. 89 90 Returns: 91 The file permissions schema that defines the structure of permission records 92 """ 93 return self.stream_permissions_reader.file_permissions_schema
A specialized stream for handling file-based ACL permissions.
This stream works with the stream_reader to:
- Fetch ACL permissions for each file in the source
- Transform permissions into a standardized format
- Generate records containing permission information
The stream_reader is responsible for the actual implementation of permission retrieval and schema definition, while this class handles the streaming interface.
44 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 45 """ 46 Yield permissions records from all remote files 47 """ 48 49 for file in stream_slice["files"]: 50 no_permissions = False 51 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 52 try: 53 permissions_record = self.stream_permissions_reader.get_file_acl_permissions( 54 file, logger=self.logger 55 ) 56 if not permissions_record: 57 no_permissions = True 58 self.logger.warning( 59 f"Unable to fetch permissions. stream={self.name} file={file.uri}" 60 ) 61 continue 62 permissions_record = self.transform_record( 63 permissions_record, file, file_datetime_string 64 ) 65 yield stream_data_to_airbyte_message(self.name, permissions_record) 66 except Exception as e: 67 self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}") 68 yield AirbyteMessage( 69 type=MessageType.LOG, 70 log=AirbyteLogMessage( 71 level=Level.ERROR, 72 message=f"Error retrieving files permissions: stream={self.name} file={file.uri}", 73 stack_trace=traceback.format_exc(), 74 ), 75 ) 76 finally: 77 if no_permissions: 78 yield AirbyteMessage( 79 type=MessageType.LOG, 80 log=AirbyteLogMessage( 81 level=Level.WARN, 82 message=f"Unable to fetch permissions. stream={self.name} file={file.uri}", 83 ), 84 )
Yield permissions records from all remote files
Inherited Members
- DefaultFileBasedStream
- FILE_TRANSFER_KW
- PRESERVE_DIRECTORY_STRUCTURE_KW
- FILES_KEY
- DATE_TIME_FORMAT
- ab_last_mod_col
- ab_file_name_col
- modified
- source_file_url
- airbyte_columns
- use_file_transfer
- preserve_directory_structure
- state
- cursor
- primary_key
- compute_slices
- transform_record
- cursor_field
- get_json_schema
- get_files
- as_airbyte_stream
- infer_schema
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- list_files
- read_records
- stream_slices
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- supports_incremental
- is_resumable
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema