airbyte_cdk.sources.file_based.stream
1from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream 2from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream 3from airbyte_cdk.sources.file_based.stream.identities_stream import FileIdentitiesStream 4from airbyte_cdk.sources.file_based.stream.permissions_file_based_stream import ( 5 PermissionsFileBasedStream, 6) 7 8__all__ = [ 9 "AbstractFileBasedStream", 10 "DefaultFileBasedStream", 11 "FileIdentitiesStream", 12 "PermissionsFileBasedStream", 13]
38class AbstractFileBasedStream(Stream): 39 """ 40 A file-based stream in an Airbyte source. 41 42 In addition to the base Stream attributes, a file-based stream has 43 - A config object (derived from the corresponding stream section in source config). 44 This contains the globs defining the stream's files. 45 - A StreamReader, which knows how to list and open files in the stream. 46 - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open 47 files in the stream. 48 - A DiscoveryPolicy that controls the number of concurrent requests sent to the source 49 during discover, and the number of files used for schema discovery. 50 - A dictionary of FileType:Parser that holds all the file types that can be handled 51 by the stream. 52 """ 53 54 def __init__( 55 self, 56 config: FileBasedStreamConfig, 57 catalog_schema: Optional[Mapping[str, Any]], 58 stream_reader: AbstractFileBasedStreamReader, 59 availability_strategy: AbstractFileBasedAvailabilityStrategy, 60 discovery_policy: AbstractDiscoveryPolicy, 61 parsers: Dict[Type[Any], FileTypeParser], 62 validation_policy: AbstractSchemaValidationPolicy, 63 errors_collector: FileBasedErrorsCollector, 64 cursor: AbstractFileBasedCursor, 65 ): 66 super().__init__() 67 self.config = config 68 self.catalog_schema = catalog_schema 69 self.validation_policy = validation_policy 70 self.stream_reader = stream_reader 71 self._discovery_policy = discovery_policy 72 self._availability_strategy = availability_strategy 73 self._parsers = parsers 74 self.errors_collector = errors_collector 75 self._cursor = cursor 76 77 @property 78 @abstractmethod 79 def primary_key(self) -> PrimaryKeyType: ... 80 81 @cache 82 def list_files(self) -> List[RemoteFile]: 83 """ 84 List all files that belong to the stream. 85 86 The output of this method is cached so we don't need to list the files more than once. 87 This means we won't pick up changes to the files during a sync. This method uses the 88 get_files method which is implemented by the concrete stream class. 89 """ 90 return list(self.get_files()) 91 92 @abstractmethod 93 def get_files(self) -> Iterable[RemoteFile]: 94 """ 95 List all files that belong to the stream as defined by the stream's globs. 96 """ 97 ... 98 99 def read_records( 100 self, 101 sync_mode: SyncMode, 102 cursor_field: Optional[List[str]] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 stream_state: Optional[Mapping[str, Any]] = None, 105 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 106 """ 107 Yield all records from all remote files in `list_files_for_this_sync`. 108 This method acts as an adapter between the generic Stream interface and the file-based's 109 stream since file-based streams manage their own states. 110 """ 111 if stream_slice is None: 112 raise ValueError("stream_slice must be set") 113 return self.read_records_from_slice(stream_slice) 114 115 @abstractmethod 116 def read_records_from_slice( 117 self, stream_slice: StreamSlice 118 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 119 """ 120 Yield all records from all remote files in `list_files_for_this_sync`. 121 """ 122 ... 123 124 def stream_slices( 125 self, 126 *, 127 sync_mode: SyncMode, 128 cursor_field: Optional[List[str]] = None, 129 stream_state: Optional[Mapping[str, Any]] = None, 130 ) -> Iterable[Optional[Mapping[str, Any]]]: 131 """ 132 This method acts as an adapter between the generic Stream interface and the file-based's 133 stream since file-based streams manage their own states. 134 """ 135 return self.compute_slices() 136 137 @abstractmethod 138 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 139 """ 140 Return a list of slices that will be used to read files in the current sync. 141 :return: The slices to use for the current sync. 142 """ 143 ... 144 145 @abstractmethod 146 @lru_cache(maxsize=None) 147 def get_json_schema(self) -> Mapping[str, Any]: 148 """ 149 Return the JSON Schema for a stream. 150 """ 151 ... 152 153 @abstractmethod 154 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 155 """ 156 Infer the schema for files in the stream. 157 """ 158 ... 159 160 def get_parser(self) -> FileTypeParser: 161 try: 162 return self._parsers[type(self.config.format)] 163 except KeyError: 164 raise UndefinedParserError( 165 FileBasedSourceError.UNDEFINED_PARSER, 166 stream=self.name, 167 format=type(self.config.format), 168 ) 169 170 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 171 if self.validation_policy: 172 return self.validation_policy.record_passes_validation_policy( 173 record=record, schema=self.catalog_schema 174 ) 175 else: 176 raise RecordParseError( 177 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 178 stream=self.name, 179 validation_policy=self.config.validation_policy, 180 ) 181 182 @cached_property 183 @deprecated("Deprecated as of CDK version 3.7.0.") 184 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 185 return self._availability_strategy 186 187 @property 188 def name(self) -> str: 189 return self.config.name 190 191 def get_cursor(self) -> Optional[Cursor]: 192 """ 193 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 194 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 195 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 196 then this override can be removed. 197 """ 198 return None
A file-based stream in an Airbyte source.
In addition to the base Stream attributes, a file-based stream has
- A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
- A StreamReader, which knows how to list and open files in the stream.
- A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
- A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
- A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
81 @cache 82 def list_files(self) -> List[RemoteFile]: 83 """ 84 List all files that belong to the stream. 85 86 The output of this method is cached so we don't need to list the files more than once. 87 This means we won't pick up changes to the files during a sync. This method uses the 88 get_files method which is implemented by the concrete stream class. 89 """ 90 return list(self.get_files())
List all files that belong to the stream.
The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.
92 @abstractmethod 93 def get_files(self) -> Iterable[RemoteFile]: 94 """ 95 List all files that belong to the stream as defined by the stream's globs. 96 """ 97 ...
List all files that belong to the stream as defined by the stream's globs.
99 def read_records( 100 self, 101 sync_mode: SyncMode, 102 cursor_field: Optional[List[str]] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 stream_state: Optional[Mapping[str, Any]] = None, 105 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 106 """ 107 Yield all records from all remote files in `list_files_for_this_sync`. 108 This method acts as an adapter between the generic Stream interface and the file-based's 109 stream since file-based streams manage their own states. 110 """ 111 if stream_slice is None: 112 raise ValueError("stream_slice must be set") 113 return self.read_records_from_slice(stream_slice)
Yield all records from all remote files in list_files_for_this_sync
.
This method acts as an adapter between the generic Stream interface and the file-based's
stream since file-based streams manage their own states.
115 @abstractmethod 116 def read_records_from_slice( 117 self, stream_slice: StreamSlice 118 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 119 """ 120 Yield all records from all remote files in `list_files_for_this_sync`. 121 """ 122 ...
Yield all records from all remote files in list_files_for_this_sync
.
124 def stream_slices( 125 self, 126 *, 127 sync_mode: SyncMode, 128 cursor_field: Optional[List[str]] = None, 129 stream_state: Optional[Mapping[str, Any]] = None, 130 ) -> Iterable[Optional[Mapping[str, Any]]]: 131 """ 132 This method acts as an adapter between the generic Stream interface and the file-based's 133 stream since file-based streams manage their own states. 134 """ 135 return self.compute_slices()
This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.
137 @abstractmethod 138 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 139 """ 140 Return a list of slices that will be used to read files in the current sync. 141 :return: The slices to use for the current sync. 142 """ 143 ...
Return a list of slices that will be used to read files in the current sync.
Returns
The slices to use for the current sync.
145 @abstractmethod 146 @lru_cache(maxsize=None) 147 def get_json_schema(self) -> Mapping[str, Any]: 148 """ 149 Return the JSON Schema for a stream. 150 """ 151 ...
Return the JSON Schema for a stream.
153 @abstractmethod 154 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 155 """ 156 Infer the schema for files in the stream. 157 """ 158 ...
Infer the schema for files in the stream.
170 def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: 171 if self.validation_policy: 172 return self.validation_policy.record_passes_validation_policy( 173 record=record, schema=self.catalog_schema 174 ) 175 else: 176 raise RecordParseError( 177 FileBasedSourceError.UNDEFINED_VALIDATION_POLICY, 178 stream=self.name, 179 validation_policy=self.config.validation_policy, 180 )
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
191 def get_cursor(self) -> Optional[Cursor]: 192 """ 193 This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations 194 the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to 195 None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface 196 then this override can be removed. 197 """ 198 return None
This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- is_resumable
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
44class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): 45 """ 46 The default file-based stream. 47 """ 48 49 FILE_TRANSFER_KW = "use_file_transfer" 50 PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure" 51 FILES_KEY = "files" 52 DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 53 ab_last_mod_col = "_ab_source_file_last_modified" 54 ab_file_name_col = "_ab_source_file_url" 55 modified = "modified" 56 source_file_url = "source_file_url" 57 airbyte_columns = [ab_last_mod_col, ab_file_name_col] 58 use_file_transfer = False 59 preserve_directory_structure = True 60 61 def __init__(self, **kwargs: Any): 62 if self.FILE_TRANSFER_KW in kwargs: 63 self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) 64 if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs: 65 self.preserve_directory_structure = kwargs.pop( 66 self.PRESERVE_DIRECTORY_STRUCTURE_KW, True 67 ) 68 super().__init__(**kwargs) 69 70 @property 71 def state(self) -> MutableMapping[str, Any]: 72 return self._cursor.get_state() 73 74 @state.setter 75 def state(self, value: MutableMapping[str, Any]) -> None: 76 """State setter, accept state serialized by state getter.""" 77 self._cursor.set_initial_state(value) 78 79 @property # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors 80 def cursor(self) -> Optional[AbstractFileBasedCursor]: 81 return self._cursor 82 83 @cursor.setter 84 def cursor(self, value: AbstractFileBasedCursor) -> None: 85 if self._cursor is not None: 86 raise RuntimeError( 87 f"Cursor for stream {self.name} is already set. This is unexpected. Please contact Support." 88 ) 89 self._cursor = value 90 91 @property 92 def primary_key(self) -> PrimaryKeyType: 93 return self.config.primary_key or self.get_parser().get_parser_defined_primary_key( 94 self.config 95 ) 96 97 def _filter_schema_invalid_properties( 98 self, configured_catalog_json_schema: Dict[str, Any] 99 ) -> Dict[str, Any]: 100 if self.use_file_transfer: 101 return { 102 "type": "object", 103 "properties": { 104 "file_path": {"type": "string"}, 105 "file_size": {"type": "string"}, 106 self.ab_file_name_col: {"type": "string"}, 107 }, 108 } 109 else: 110 return super()._filter_schema_invalid_properties(configured_catalog_json_schema) 111 112 def _duplicated_files_names( 113 self, slices: List[dict[str, List[RemoteFile]]] 114 ) -> List[dict[str, List[str]]]: 115 seen_file_names: Dict[str, List[str]] = defaultdict(list) 116 for file_slice in slices: 117 for file_found in file_slice[self.FILES_KEY]: 118 file_name = path.basename(file_found.uri) 119 seen_file_names[file_name].append(file_found.uri) 120 return [ 121 {file_name: paths} for file_name, paths in seen_file_names.items() if len(paths) > 1 122 ] 123 124 def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: 125 # Sort files by last_modified, uri and return them grouped by last_modified 126 all_files = self.list_files() 127 files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) 128 sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) 129 slices = [ 130 {self.FILES_KEY: list(group[1])} 131 for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) 132 ] 133 if slices and not self.preserve_directory_structure: 134 duplicated_files_names = self._duplicated_files_names(slices) 135 if duplicated_files_names: 136 raise DuplicatedFilesError( 137 stream=self.name, duplicated_files_names=duplicated_files_names 138 ) 139 return slices 140 141 def transform_record( 142 self, record: dict[str, Any], file: RemoteFile, last_updated: str 143 ) -> dict[str, Any]: 144 # adds _ab_source_file_last_modified and _ab_source_file_url to the record 145 record[self.ab_last_mod_col] = last_updated 146 record[self.ab_file_name_col] = file.uri 147 return record 148 149 def transform_record_for_file_transfer( 150 self, record: dict[str, Any], file: RemoteFile 151 ) -> dict[str, Any]: 152 # timstamp() returns a float representing the number of seconds since the unix epoch 153 record[self.modified] = int(file.last_modified.timestamp()) * 1000 154 record[self.source_file_url] = file.uri 155 return record 156 157 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 158 """ 159 Yield all records from all remote files in `list_files_for_this_sync`. 160 161 If an error is encountered reading records from a file, log a message and do not attempt 162 to sync the rest of the file. 163 """ 164 schema = self.catalog_schema 165 if schema is None: 166 # On read requests we should always have the catalog available 167 raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name) 168 # The stream only supports a single file type, so we can use the same parser for all files 169 parser = self.get_parser() 170 for file in stream_slice["files"]: 171 # only serialize the datetime once 172 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 173 n_skipped = line_no = 0 174 175 try: 176 if self.use_file_transfer: 177 self.logger.info(f"{self.name}: {file} file-based syncing") 178 # todo: complete here the code to not rely on local parser 179 file_transfer = FileTransfer() 180 for record in file_transfer.get_file( 181 self.config, file, self.stream_reader, self.logger 182 ): 183 line_no += 1 184 if not self.record_passes_validation_policy(record): 185 n_skipped += 1 186 continue 187 record = self.transform_record_for_file_transfer(record, file) 188 yield stream_data_to_airbyte_message( 189 self.name, record, is_file_transfer_message=True 190 ) 191 else: 192 for record in parser.parse_records( 193 self.config, file, self.stream_reader, self.logger, schema 194 ): 195 line_no += 1 196 if self.config.schemaless: 197 record = {"data": record} 198 elif not self.record_passes_validation_policy(record): 199 n_skipped += 1 200 continue 201 record = self.transform_record(record, file, file_datetime_string) 202 yield stream_data_to_airbyte_message(self.name, record) 203 self._cursor.add_file(file) 204 205 except StopSyncPerValidationPolicy: 206 yield AirbyteMessage( 207 type=MessageType.LOG, 208 log=AirbyteLogMessage( 209 level=Level.WARN, 210 message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}", 211 ), 212 ) 213 break 214 215 except RecordParseError: 216 # Increment line_no because the exception was raised before we could increment it 217 line_no += 1 218 self.errors_collector.collect( 219 AirbyteMessage( 220 type=MessageType.LOG, 221 log=AirbyteLogMessage( 222 level=Level.ERROR, 223 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 224 stack_trace=traceback.format_exc(), 225 ), 226 ), 227 ) 228 229 except AirbyteTracedException as exc: 230 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 231 raise exc 232 233 except Exception: 234 yield AirbyteMessage( 235 type=MessageType.LOG, 236 log=AirbyteLogMessage( 237 level=Level.ERROR, 238 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 239 stack_trace=traceback.format_exc(), 240 ), 241 ) 242 243 finally: 244 if n_skipped: 245 yield AirbyteMessage( 246 type=MessageType.LOG, 247 log=AirbyteLogMessage( 248 level=Level.WARN, 249 message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}", 250 ), 251 ) 252 253 @property 254 def cursor_field(self) -> Union[str, List[str]]: 255 """ 256 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 257 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 258 """ 259 return self.ab_last_mod_col 260 261 @cache 262 def get_json_schema(self) -> JsonSchema: 263 extra_fields = { 264 self.ab_last_mod_col: {"type": "string"}, 265 self.ab_file_name_col: {"type": "string"}, 266 } 267 try: 268 schema = self._get_raw_json_schema() 269 except InvalidSchemaError as config_exception: 270 raise AirbyteTracedException( 271 internal_message="Please check the logged errors for more information.", 272 message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, 273 exception=AirbyteTracedException(exception=config_exception), 274 failure_type=FailureType.config_error, 275 ) 276 except AirbyteTracedException as ate: 277 raise ate 278 except Exception as exc: 279 raise SchemaInferenceError( 280 FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name 281 ) from exc 282 else: 283 return {"type": "object", "properties": {**extra_fields, **schema["properties"]}} 284 285 def _get_raw_json_schema(self) -> JsonSchema: 286 if self.use_file_transfer: 287 return file_transfer_schema 288 elif self.config.input_schema: 289 return self.config.get_input_schema() # type: ignore 290 elif self.config.schemaless: 291 return schemaless_schema 292 else: 293 files = self.list_files() 294 first_n_files = len(files) 295 296 if self.config.recent_n_files_to_read_for_schema_discovery: 297 self.logger.info( 298 msg=( 299 f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema " 300 f"for stream {self.name} due to limitation in config." 301 ) 302 ) 303 first_n_files = self.config.recent_n_files_to_read_for_schema_discovery 304 305 if first_n_files == 0: 306 self.logger.warning( 307 msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream." 308 ) 309 return schemaless_schema 310 311 max_n_files_for_schema_inference = ( 312 self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser()) 313 ) 314 315 if first_n_files > max_n_files_for_schema_inference: 316 # Use the most recent files for schema inference, so we pick up schema changes during discovery. 317 self.logger.warning( 318 msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files." 319 ) 320 first_n_files = max_n_files_for_schema_inference 321 322 files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files] 323 324 inferred_schema = self.infer_schema(files) 325 326 if not inferred_schema: 327 raise InvalidSchemaError( 328 FileBasedSourceError.INVALID_SCHEMA_ERROR, 329 details=f"Empty schema. Please check that the files are valid for format {self.config.format}", 330 stream=self.name, 331 ) 332 333 schema = {"type": "object", "properties": inferred_schema} 334 335 return schema 336 337 def get_files(self) -> Iterable[RemoteFile]: 338 """ 339 Return all files that belong to the stream as defined by the stream's globs. 340 """ 341 return self.stream_reader.get_matching_files( 342 self.config.globs or [], self.config.legacy_prefix, self.logger 343 ) 344 345 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 346 loop = asyncio.get_event_loop() 347 schema = loop.run_until_complete(self._infer_schema(files)) 348 # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference 349 return self._fill_nulls(deepcopy(schema)) 350 351 @staticmethod 352 def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]: 353 if isinstance(schema, dict): 354 for k, v in schema.items(): 355 if k == "type": 356 if isinstance(v, list): 357 if "null" not in v: 358 schema[k] = ["null"] + v 359 elif v != "null": 360 if isinstance(v, (str, list)): 361 schema[k] = ["null", v] 362 else: 363 DefaultFileBasedStream._fill_nulls(v) 364 else: 365 DefaultFileBasedStream._fill_nulls(v) 366 elif isinstance(schema, list): 367 for item in schema: 368 DefaultFileBasedStream._fill_nulls(item) 369 return schema 370 371 async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 372 """ 373 Infer the schema for a stream. 374 375 Each file type has a corresponding `infer_schema` handler. 376 Dispatch on file type. 377 """ 378 base_schema: SchemaType = {} 379 pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set() 380 381 n_started, n_files = 0, len(files) 382 files_iterator = iter(files) 383 while pending_tasks or n_started < n_files: 384 while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and ( 385 file := next(files_iterator, None) 386 ): 387 pending_tasks.add(asyncio.create_task(self._infer_file_schema(file))) 388 n_started += 1 389 # Return when the first task is completed so that we can enqueue a new task as soon as the 390 # number of concurrent tasks drops below the number allowed. 391 done, pending_tasks = await asyncio.wait( 392 pending_tasks, return_when=asyncio.FIRST_COMPLETED 393 ) 394 for task in done: 395 try: 396 base_schema = merge_schemas(base_schema, task.result()) 397 except AirbyteTracedException as ate: 398 raise ate 399 except Exception as exc: 400 self.logger.error( 401 f"An error occurred inferring the schema. \n {traceback.format_exc()}", 402 exc_info=exc, 403 ) 404 405 return base_schema 406 407 async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: 408 try: 409 return await self.get_parser().infer_schema( 410 self.config, file, self.stream_reader, self.logger 411 ) 412 except AirbyteTracedException as ate: 413 raise ate 414 except Exception as exc: 415 raise SchemaInferenceError( 416 FileBasedSourceError.SCHEMA_INFERENCE_ERROR, 417 file=file.uri, 418 format=str(self.config.format), 419 stream=self.name, 420 ) from exc
The default file-based stream.
61 def __init__(self, **kwargs: Any): 62 if self.FILE_TRANSFER_KW in kwargs: 63 self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) 64 if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs: 65 self.preserve_directory_structure = kwargs.pop( 66 self.PRESERVE_DIRECTORY_STRUCTURE_KW, True 67 ) 68 super().__init__(**kwargs)
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
91 @property 92 def primary_key(self) -> PrimaryKeyType: 93 return self.config.primary_key or self.get_parser().get_parser_defined_primary_key( 94 self.config 95 )
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
124 def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: 125 # Sort files by last_modified, uri and return them grouped by last_modified 126 all_files = self.list_files() 127 files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) 128 sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) 129 slices = [ 130 {self.FILES_KEY: list(group[1])} 131 for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) 132 ] 133 if slices and not self.preserve_directory_structure: 134 duplicated_files_names = self._duplicated_files_names(slices) 135 if duplicated_files_names: 136 raise DuplicatedFilesError( 137 stream=self.name, duplicated_files_names=duplicated_files_names 138 ) 139 return slices
Return a list of slices that will be used to read files in the current sync.
Returns
The slices to use for the current sync.
141 def transform_record( 142 self, record: dict[str, Any], file: RemoteFile, last_updated: str 143 ) -> dict[str, Any]: 144 # adds _ab_source_file_last_modified and _ab_source_file_url to the record 145 record[self.ab_last_mod_col] = last_updated 146 record[self.ab_file_name_col] = file.uri 147 return record
149 def transform_record_for_file_transfer( 150 self, record: dict[str, Any], file: RemoteFile 151 ) -> dict[str, Any]: 152 # timstamp() returns a float representing the number of seconds since the unix epoch 153 record[self.modified] = int(file.last_modified.timestamp()) * 1000 154 record[self.source_file_url] = file.uri 155 return record
157 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 158 """ 159 Yield all records from all remote files in `list_files_for_this_sync`. 160 161 If an error is encountered reading records from a file, log a message and do not attempt 162 to sync the rest of the file. 163 """ 164 schema = self.catalog_schema 165 if schema is None: 166 # On read requests we should always have the catalog available 167 raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name) 168 # The stream only supports a single file type, so we can use the same parser for all files 169 parser = self.get_parser() 170 for file in stream_slice["files"]: 171 # only serialize the datetime once 172 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 173 n_skipped = line_no = 0 174 175 try: 176 if self.use_file_transfer: 177 self.logger.info(f"{self.name}: {file} file-based syncing") 178 # todo: complete here the code to not rely on local parser 179 file_transfer = FileTransfer() 180 for record in file_transfer.get_file( 181 self.config, file, self.stream_reader, self.logger 182 ): 183 line_no += 1 184 if not self.record_passes_validation_policy(record): 185 n_skipped += 1 186 continue 187 record = self.transform_record_for_file_transfer(record, file) 188 yield stream_data_to_airbyte_message( 189 self.name, record, is_file_transfer_message=True 190 ) 191 else: 192 for record in parser.parse_records( 193 self.config, file, self.stream_reader, self.logger, schema 194 ): 195 line_no += 1 196 if self.config.schemaless: 197 record = {"data": record} 198 elif not self.record_passes_validation_policy(record): 199 n_skipped += 1 200 continue 201 record = self.transform_record(record, file, file_datetime_string) 202 yield stream_data_to_airbyte_message(self.name, record) 203 self._cursor.add_file(file) 204 205 except StopSyncPerValidationPolicy: 206 yield AirbyteMessage( 207 type=MessageType.LOG, 208 log=AirbyteLogMessage( 209 level=Level.WARN, 210 message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}", 211 ), 212 ) 213 break 214 215 except RecordParseError: 216 # Increment line_no because the exception was raised before we could increment it 217 line_no += 1 218 self.errors_collector.collect( 219 AirbyteMessage( 220 type=MessageType.LOG, 221 log=AirbyteLogMessage( 222 level=Level.ERROR, 223 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 224 stack_trace=traceback.format_exc(), 225 ), 226 ), 227 ) 228 229 except AirbyteTracedException as exc: 230 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 231 raise exc 232 233 except Exception: 234 yield AirbyteMessage( 235 type=MessageType.LOG, 236 log=AirbyteLogMessage( 237 level=Level.ERROR, 238 message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", 239 stack_trace=traceback.format_exc(), 240 ), 241 ) 242 243 finally: 244 if n_skipped: 245 yield AirbyteMessage( 246 type=MessageType.LOG, 247 log=AirbyteLogMessage( 248 level=Level.WARN, 249 message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}", 250 ), 251 )
Yield all records from all remote files in list_files_for_this_sync
.
If an error is encountered reading records from a file, log a message and do not attempt to sync the rest of the file.
253 @property 254 def cursor_field(self) -> Union[str, List[str]]: 255 """ 256 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 257 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 258 """ 259 return self.ab_last_mod_col
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
261 @cache 262 def get_json_schema(self) -> JsonSchema: 263 extra_fields = { 264 self.ab_last_mod_col: {"type": "string"}, 265 self.ab_file_name_col: {"type": "string"}, 266 } 267 try: 268 schema = self._get_raw_json_schema() 269 except InvalidSchemaError as config_exception: 270 raise AirbyteTracedException( 271 internal_message="Please check the logged errors for more information.", 272 message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, 273 exception=AirbyteTracedException(exception=config_exception), 274 failure_type=FailureType.config_error, 275 ) 276 except AirbyteTracedException as ate: 277 raise ate 278 except Exception as exc: 279 raise SchemaInferenceError( 280 FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name 281 ) from exc 282 else: 283 return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
Return the JSON Schema for a stream.
337 def get_files(self) -> Iterable[RemoteFile]: 338 """ 339 Return all files that belong to the stream as defined by the stream's globs. 340 """ 341 return self.stream_reader.get_matching_files( 342 self.config.globs or [], self.config.legacy_prefix, self.logger 343 )
Return all files that belong to the stream as defined by the stream's globs.
345 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 346 loop = asyncio.get_event_loop() 347 schema = loop.run_until_complete(self._infer_schema(files)) 348 # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference 349 return self._fill_nulls(deepcopy(schema))
Infer the schema for files in the stream.
Inherited Members
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- list_files
- read_records
- stream_slices
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- is_resumable
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
19class FileIdentitiesStream(IdentitiesStream): 20 """ 21 The identities stream. A full refresh stream to sync identities from a certain domain. 22 The stream reader manage the logic to get such data, which is implemented on connector side. 23 """ 24 25 is_resumable = False 26 27 def __init__( 28 self, 29 catalog_schema: Optional[Mapping[str, Any]], 30 stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, 31 discovery_policy: AbstractDiscoveryPolicy, 32 errors_collector: FileBasedErrorsCollector, 33 ) -> None: 34 super().__init__() 35 self.catalog_schema = catalog_schema 36 self.stream_permissions_reader = stream_permissions_reader 37 self._discovery_policy = discovery_policy 38 self.errors_collector = errors_collector 39 self._cursor: MutableMapping[str, Any] = {} 40 41 @property 42 def primary_key(self) -> PrimaryKeyType: 43 return None 44 45 def load_identity_groups(self) -> Iterable[Dict[str, Any]]: 46 return self.stream_permissions_reader.load_identity_groups(logger=self.logger) 47 48 @cache 49 def get_json_schema(self) -> JsonSchema: 50 return self.stream_permissions_reader.identities_schema
The identities stream. A full refresh stream to sync identities from a certain domain. The stream reader manage the logic to get such data, which is implemented on connector side.
27 def __init__( 28 self, 29 catalog_schema: Optional[Mapping[str, Any]], 30 stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, 31 discovery_policy: AbstractDiscoveryPolicy, 32 errors_collector: FileBasedErrorsCollector, 33 ) -> None: 34 super().__init__() 35 self.catalog_schema = catalog_schema 36 self.stream_permissions_reader = stream_permissions_reader 37 self._discovery_policy = discovery_policy 38 self.errors_collector = errors_collector 39 self._cursor: MutableMapping[str, Any] = {}
Returns
True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
48 @cache 49 def get_json_schema(self) -> JsonSchema: 50 return self.stream_permissions_reader.identities_schema
Returns
A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.
Inherited Members
- airbyte_cdk.sources.streams.permissions.identities_stream.IdentitiesStream
- IDENTITIES_STREAM_NAME
- state
- read_records
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- stream_slices
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
20class PermissionsFileBasedStream(DefaultFileBasedStream): 21 """ 22 A specialized stream for handling file-based ACL permissions. 23 24 This stream works with the stream_reader to: 25 1. Fetch ACL permissions for each file in the source 26 2. Transform permissions into a standardized format 27 3. Generate records containing permission information 28 29 The stream_reader is responsible for the actual implementation of permission retrieval 30 and schema definition, while this class handles the streaming interface. 31 """ 32 33 def __init__( 34 self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any 35 ): 36 super().__init__(**kwargs) 37 self.stream_permissions_reader = stream_permissions_reader 38 39 def _filter_schema_invalid_properties( 40 self, configured_catalog_json_schema: Dict[str, Any] 41 ) -> Dict[str, Any]: 42 return self.stream_permissions_reader.file_permissions_schema 43 44 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 45 """ 46 Yield permissions records from all remote files 47 """ 48 49 for file in stream_slice["files"]: 50 no_permissions = False 51 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 52 try: 53 permissions_record = self.stream_permissions_reader.get_file_acl_permissions( 54 file, logger=self.logger 55 ) 56 if not permissions_record: 57 no_permissions = True 58 self.logger.warning( 59 f"Unable to fetch permissions. stream={self.name} file={file.uri}" 60 ) 61 continue 62 permissions_record = self.transform_record( 63 permissions_record, file, file_datetime_string 64 ) 65 yield stream_data_to_airbyte_message( 66 self.name, permissions_record, is_file_transfer_message=False 67 ) 68 except Exception as e: 69 self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}") 70 yield AirbyteMessage( 71 type=MessageType.LOG, 72 log=AirbyteLogMessage( 73 level=Level.ERROR, 74 message=f"Error retrieving files permissions: stream={self.name} file={file.uri}", 75 stack_trace=traceback.format_exc(), 76 ), 77 ) 78 finally: 79 if no_permissions: 80 yield AirbyteMessage( 81 type=MessageType.LOG, 82 log=AirbyteLogMessage( 83 level=Level.WARN, 84 message=f"Unable to fetch permissions. stream={self.name} file={file.uri}", 85 ), 86 ) 87 88 def _get_raw_json_schema(self) -> JsonSchema: 89 """ 90 Retrieve the raw JSON schema for file permissions from the stream reader. 91 92 Returns: 93 The file permissions schema that defines the structure of permission records 94 """ 95 return self.stream_permissions_reader.file_permissions_schema
A specialized stream for handling file-based ACL permissions.
This stream works with the stream_reader to:
- Fetch ACL permissions for each file in the source
- Transform permissions into a standardized format
- Generate records containing permission information
The stream_reader is responsible for the actual implementation of permission retrieval and schema definition, while this class handles the streaming interface.
44 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: 45 """ 46 Yield permissions records from all remote files 47 """ 48 49 for file in stream_slice["files"]: 50 no_permissions = False 51 file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) 52 try: 53 permissions_record = self.stream_permissions_reader.get_file_acl_permissions( 54 file, logger=self.logger 55 ) 56 if not permissions_record: 57 no_permissions = True 58 self.logger.warning( 59 f"Unable to fetch permissions. stream={self.name} file={file.uri}" 60 ) 61 continue 62 permissions_record = self.transform_record( 63 permissions_record, file, file_datetime_string 64 ) 65 yield stream_data_to_airbyte_message( 66 self.name, permissions_record, is_file_transfer_message=False 67 ) 68 except Exception as e: 69 self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}") 70 yield AirbyteMessage( 71 type=MessageType.LOG, 72 log=AirbyteLogMessage( 73 level=Level.ERROR, 74 message=f"Error retrieving files permissions: stream={self.name} file={file.uri}", 75 stack_trace=traceback.format_exc(), 76 ), 77 ) 78 finally: 79 if no_permissions: 80 yield AirbyteMessage( 81 type=MessageType.LOG, 82 log=AirbyteLogMessage( 83 level=Level.WARN, 84 message=f"Unable to fetch permissions. stream={self.name} file={file.uri}", 85 ), 86 )
Yield permissions records from all remote files
Inherited Members
- DefaultFileBasedStream
- FILE_TRANSFER_KW
- PRESERVE_DIRECTORY_STRUCTURE_KW
- FILES_KEY
- DATE_TIME_FORMAT
- ab_last_mod_col
- ab_file_name_col
- modified
- source_file_url
- airbyte_columns
- use_file_transfer
- preserve_directory_structure
- state
- cursor
- primary_key
- compute_slices
- transform_record
- transform_record_for_file_transfer
- cursor_field
- get_json_schema
- get_files
- infer_schema
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- list_files
- read_records
- stream_slices
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- as_airbyte_stream
- supports_incremental
- is_resumable
- namespace
- source_defined_cursor
- exit_on_rate_limit
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema