airbyte_cdk.sources.file_based.stream

 1from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream
 2from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream
 3from airbyte_cdk.sources.file_based.stream.identities_stream import FileIdentitiesStream
 4from airbyte_cdk.sources.file_based.stream.permissions_file_based_stream import (
 5    PermissionsFileBasedStream,
 6)
 7
 8__all__ = [
 9    "AbstractFileBasedStream",
10    "DefaultFileBasedStream",
11    "FileIdentitiesStream",
12    "PermissionsFileBasedStream",
13]
class AbstractFileBasedStream(airbyte_cdk.sources.streams.core.Stream):
 38class AbstractFileBasedStream(Stream):
 39    """
 40    A file-based stream in an Airbyte source.
 41
 42    In addition to the base Stream attributes, a file-based stream has
 43    - A config object (derived from the corresponding stream section in source config).
 44      This contains the globs defining the stream's files.
 45    - A StreamReader, which knows how to list and open files in the stream.
 46    - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open
 47      files in the stream.
 48    - A DiscoveryPolicy that controls the number of concurrent requests sent to the source
 49      during discover, and the number of files used for schema discovery.
 50    - A dictionary of FileType:Parser that holds all the file types that can be handled
 51      by the stream.
 52    """
 53
 54    def __init__(
 55        self,
 56        config: FileBasedStreamConfig,
 57        catalog_schema: Optional[Mapping[str, Any]],
 58        stream_reader: AbstractFileBasedStreamReader,
 59        availability_strategy: AbstractFileBasedAvailabilityStrategy,
 60        discovery_policy: AbstractDiscoveryPolicy,
 61        parsers: Dict[Type[Any], FileTypeParser],
 62        validation_policy: AbstractSchemaValidationPolicy,
 63        errors_collector: FileBasedErrorsCollector,
 64        cursor: AbstractFileBasedCursor,
 65    ):
 66        super().__init__()
 67        self.config = config
 68        self.catalog_schema = catalog_schema
 69        self.validation_policy = validation_policy
 70        self.stream_reader = stream_reader
 71        self._discovery_policy = discovery_policy
 72        self._availability_strategy = availability_strategy
 73        self._parsers = parsers
 74        self.errors_collector = errors_collector
 75        self._cursor = cursor
 76
 77    @property
 78    @abstractmethod
 79    def primary_key(self) -> PrimaryKeyType: ...
 80
 81    @cache
 82    def list_files(self) -> List[RemoteFile]:
 83        """
 84        List all files that belong to the stream.
 85
 86        The output of this method is cached so we don't need to list the files more than once.
 87        This means we won't pick up changes to the files during a sync. This method uses the
 88        get_files method which is implemented by the concrete stream class.
 89        """
 90        return list(self.get_files())
 91
 92    @abstractmethod
 93    def get_files(self) -> Iterable[RemoteFile]:
 94        """
 95        List all files that belong to the stream as defined by the stream's globs.
 96        """
 97        ...
 98
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)
114
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...
123
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()
136
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...
144
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...
152
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...
159
160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166                stream=self.name,
167                format=type(self.config.format),
168            )
169
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178                stream=self.name,
179                validation_policy=self.config.validation_policy,
180            )
181
182    @cached_property
183    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
184        return self._availability_strategy
185
186    @property
187    def name(self) -> str:
188        return self.config.name
189
190    def get_cursor(self) -> Optional[Cursor]:
191        """
192        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
193        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
194        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
195        then this override can be removed.
196        """
197        return None

A file-based stream in an Airbyte source.

In addition to the base Stream attributes, a file-based stream has

  • A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
  • A StreamReader, which knows how to list and open files in the stream.
  • A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
  • A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
  • A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
config
catalog_schema
validation_policy
stream_reader
errors_collector
primary_key: Union[str, List[str], NoneType]
77    @property
78    @abstractmethod
79    def primary_key(self) -> PrimaryKeyType: ...
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

@cache
def list_files(self) -> List[airbyte_cdk.sources.file_based.RemoteFile]:
81    @cache
82    def list_files(self) -> List[RemoteFile]:
83        """
84        List all files that belong to the stream.
85
86        The output of this method is cached so we don't need to list the files more than once.
87        This means we won't pick up changes to the files during a sync. This method uses the
88        get_files method which is implemented by the concrete stream class.
89        """
90        return list(self.get_files())

List all files that belong to the stream.

The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.

@abstractmethod
def get_files(self) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
92    @abstractmethod
93    def get_files(self) -> Iterable[RemoteFile]:
94        """
95        List all files that belong to the stream as defined by the stream's globs.
96        """
97        ...

List all files that belong to the stream as defined by the stream's globs.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)

Yield all records from all remote files in list_files_for_this_sync. This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

@abstractmethod
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...

Yield all records from all remote files in list_files_for_this_sync.

def stream_slices( self, *, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()

This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

@abstractmethod
def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...

Return a list of slices that will be used to read files in the current sync.

Returns

The slices to use for the current sync.

@abstractmethod
@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...

Return the JSON Schema for a stream.

@abstractmethod
def infer_schema( self, files: List[airbyte_cdk.sources.file_based.RemoteFile]) -> Mapping[str, Any]:
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...

Infer the schema for files in the stream.

160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166                stream=self.name,
167                format=type(self.config.format),
168            )
def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178                stream=self.name,
179                validation_policy=self.config.validation_policy,
180            )
182    @cached_property
183    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
184        return self._availability_strategy
name: str
186    @property
187    def name(self) -> str:
188        return self.config.name
Returns

Stream name. By default this is the implementing class name, but it can be overridden as needed.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
190    def get_cursor(self) -> Optional[Cursor]:
191        """
192        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
193        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
194        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
195        then this override can be removed.
196        """
197        return None

This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.

 57class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
 58    """
 59    The default file-based stream.
 60    """
 61
 62    FILE_TRANSFER_KW = "use_file_transfer"
 63    PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure"
 64    FILES_KEY = "files"
 65    DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
 66    ab_last_mod_col = "_ab_source_file_last_modified"
 67    ab_file_name_col = "_ab_source_file_url"
 68    modified = "modified"
 69    source_file_url = "source_file_url"
 70    airbyte_columns = [ab_last_mod_col, ab_file_name_col]
 71    use_file_transfer = False
 72    preserve_directory_structure = True
 73    _file_transfer = FileTransfer()
 74
 75    def __init__(self, **kwargs: Any):
 76        if self.FILE_TRANSFER_KW in kwargs:
 77            self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False)
 78        if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs:
 79            self.preserve_directory_structure = kwargs.pop(
 80                self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
 81            )
 82        super().__init__(**kwargs)
 83
 84    @property
 85    def state(self) -> MutableMapping[str, Any]:
 86        return self._cursor.get_state()
 87
 88    @state.setter
 89    def state(self, value: MutableMapping[str, Any]) -> None:
 90        """State setter, accept state serialized by state getter."""
 91        self._cursor.set_initial_state(value)
 92
 93    @property  # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors
 94    def cursor(self) -> Optional[AbstractFileBasedCursor]:
 95        return self._cursor
 96
 97    @cursor.setter
 98    def cursor(self, value: AbstractFileBasedCursor) -> None:
 99        if self._cursor is not None:
100            raise RuntimeError(
101                f"Cursor for stream {self.name} is already set. This is unexpected. Please contact Support."
102            )
103        self._cursor = value
104
105    @property
106    def primary_key(self) -> PrimaryKeyType:
107        return self.config.primary_key or self.get_parser().get_parser_defined_primary_key(
108            self.config
109        )
110
111    def _duplicated_files_names(
112        self, slices: List[dict[str, List[RemoteFile]]]
113    ) -> List[dict[str, List[str]]]:
114        seen_file_names: Dict[str, List[str]] = defaultdict(list)
115        for file_slice in slices:
116            for file_found in file_slice[self.FILES_KEY]:
117                file_name = path.basename(file_found.uri)
118                seen_file_names[file_name].append(file_found.uri)
119        return [
120            {file_name: paths} for file_name, paths in seen_file_names.items() if len(paths) > 1
121        ]
122
123    def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
124        # Sort files by last_modified, uri and return them grouped by last_modified
125        all_files = self.list_files()
126        files_to_read = self._cursor.get_files_to_sync(all_files, self.logger)
127        sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri))
128        slices = [
129            {self.FILES_KEY: list(group[1])}
130            for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)
131        ]
132        if slices and not self.preserve_directory_structure:
133            duplicated_files_names = self._duplicated_files_names(slices)
134            if duplicated_files_names:
135                raise DuplicatedFilesError(
136                    stream=self.name, duplicated_files_names=duplicated_files_names
137                )
138        return slices
139
140    def transform_record(
141        self, record: dict[str, Any], file: RemoteFile, last_updated: str
142    ) -> dict[str, Any]:
143        # adds _ab_source_file_last_modified and _ab_source_file_url to the record
144        record[self.ab_last_mod_col] = last_updated
145        record[self.ab_file_name_col] = file.uri
146        return record
147
148    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
149        """
150        Yield all records from all remote files in `list_files_for_this_sync`.
151
152        If an error is encountered reading records from a file, log a message and do not attempt
153        to sync the rest of the file.
154        """
155        schema = self.catalog_schema
156        if schema is None:
157            # On read requests we should always have the catalog available
158            raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name)
159        # The stream only supports a single file type, so we can use the same parser for all files
160        parser = self.get_parser()
161        for file in stream_slice["files"]:
162            # only serialize the datetime once
163            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
164            n_skipped = line_no = 0
165
166            try:
167                if self.use_file_transfer:
168                    for file_record_data, file_reference in self._file_transfer.upload(
169                        file=file, stream_reader=self.stream_reader, logger=self.logger
170                    ):
171                        yield stream_data_to_airbyte_message(
172                            self.name,
173                            file_record_data.dict(exclude_none=True),
174                            file_reference=file_reference,
175                        )
176                else:
177                    for record in parser.parse_records(
178                        self.config, file, self.stream_reader, self.logger, schema
179                    ):
180                        line_no += 1
181                        if self.config.schemaless:
182                            record = {"data": record}
183                        elif not self.record_passes_validation_policy(record):
184                            n_skipped += 1
185                            continue
186                        record = self.transform_record(record, file, file_datetime_string)
187                        yield stream_data_to_airbyte_message(self.name, record)
188                self._cursor.add_file(file)
189
190            except StopSyncPerValidationPolicy:
191                yield AirbyteMessage(
192                    type=MessageType.LOG,
193                    log=AirbyteLogMessage(
194                        level=Level.WARN,
195                        message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}",
196                    ),
197                )
198                break
199
200            except RecordParseError:
201                # Increment line_no because the exception was raised before we could increment it
202                line_no += 1
203                self.errors_collector.collect(
204                    AirbyteMessage(
205                        type=MessageType.LOG,
206                        log=AirbyteLogMessage(
207                            level=Level.ERROR,
208                            message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
209                            stack_trace=traceback.format_exc(),
210                        ),
211                    ),
212                )
213
214            except AirbyteTracedException as exc:
215                # Re-raise the exception to stop the whole sync immediately as this is a fatal error
216                raise exc
217
218            except Exception:
219                yield AirbyteMessage(
220                    type=MessageType.LOG,
221                    log=AirbyteLogMessage(
222                        level=Level.ERROR,
223                        message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
224                        stack_trace=traceback.format_exc(),
225                    ),
226                )
227
228            finally:
229                if n_skipped:
230                    yield AirbyteMessage(
231                        type=MessageType.LOG,
232                        log=AirbyteLogMessage(
233                            level=Level.WARN,
234                            message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}",
235                        ),
236                    )
237
238    @property
239    def cursor_field(self) -> Union[str, List[str]]:
240        """
241        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
242        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
243        """
244        return self.ab_last_mod_col
245
246    @cache
247    def get_json_schema(self) -> JsonSchema:  # type: ignore
248        if self.use_file_transfer:
249            return file_transfer_schema
250        extra_fields = {
251            self.ab_last_mod_col: {"type": "string"},
252            self.ab_file_name_col: {"type": "string"},
253        }
254        try:
255            schema = self._get_raw_json_schema()
256        except InvalidSchemaError as config_exception:
257            raise AirbyteTracedException(
258                internal_message="Please check the logged errors for more information.",
259                message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
260                exception=AirbyteTracedException(exception=config_exception),
261                failure_type=FailureType.config_error,
262            )
263        except EmptyFileSchemaInferenceError as exc:
264            self._raise_schema_inference_error(exc)
265        except AirbyteTracedException as ate:
266            raise ate
267        except Exception as exc:
268            self._raise_schema_inference_error(exc)
269        else:
270            return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
271
272    def _get_raw_json_schema(self) -> JsonSchema:
273        if self.config.input_schema:
274            return self.config.get_input_schema()  # type: ignore
275        elif self.config.schemaless:
276            return schemaless_schema
277        else:
278            files = self.list_files()
279            first_n_files = len(files)
280
281            if self.config.recent_n_files_to_read_for_schema_discovery:
282                self.logger.info(
283                    msg=(
284                        f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema "
285                        f"for stream {self.name} due to limitation in config."
286                    )
287                )
288                first_n_files = self.config.recent_n_files_to_read_for_schema_discovery
289
290        if first_n_files == 0:
291            self.logger.warning(
292                msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream."
293            )
294            return schemaless_schema
295
296        max_n_files_for_schema_inference = (
297            self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())
298        )
299
300        if first_n_files > max_n_files_for_schema_inference:
301            # Use the most recent files for schema inference, so we pick up schema changes during discovery.
302            self.logger.warning(
303                msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files."
304            )
305            first_n_files = max_n_files_for_schema_inference
306
307        files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files]
308
309        inferred_schema = self.infer_schema(files)
310
311        if not inferred_schema:
312            raise InvalidSchemaError(
313                FileBasedSourceError.INVALID_SCHEMA_ERROR,
314                details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
315                stream=self.name,
316            )
317
318        schema = {"type": "object", "properties": inferred_schema}
319
320        return schema
321
322    def get_files(self) -> Iterable[RemoteFile]:
323        """
324        Return all files that belong to the stream as defined by the stream's globs.
325        """
326        return self.stream_reader.get_matching_files(
327            self.config.globs or [], self.config.legacy_prefix, self.logger
328        )
329
330    def as_airbyte_stream(self) -> AirbyteStream:
331        file_stream = super().as_airbyte_stream()
332        file_stream.is_file_based = self.use_file_transfer
333        return file_stream
334
335    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
336        loop = asyncio.get_event_loop()
337        schema = loop.run_until_complete(self._infer_schema(files))
338        # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
339        return self._fill_nulls(deepcopy(schema))
340
341    @staticmethod
342    def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]:
343        if isinstance(schema, dict):
344            for k, v in schema.items():
345                if k == "type":
346                    if isinstance(v, list):
347                        if "null" not in v:
348                            schema[k] = ["null"] + v
349                    elif v != "null":
350                        if isinstance(v, (str, list)):
351                            schema[k] = ["null", v]
352                        else:
353                            DefaultFileBasedStream._fill_nulls(v)
354                else:
355                    DefaultFileBasedStream._fill_nulls(v)
356        elif isinstance(schema, list):
357            for item in schema:
358                DefaultFileBasedStream._fill_nulls(item)
359        return schema
360
361    async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
362        """
363        Infer the schema for a stream.
364
365        Each file type has a corresponding `infer_schema` handler.
366        Dispatch on file type.
367        """
368        base_schema: SchemaType = {}
369        pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set()
370
371        n_started, n_files = 0, len(files)
372        files_iterator = iter(files)
373        while pending_tasks or n_started < n_files:
374            while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and (
375                file := next(files_iterator, None)
376            ):
377                pending_tasks.add(asyncio.create_task(self._infer_file_schema(file)))
378                n_started += 1
379            # Return when the first task is completed so that we can enqueue a new task as soon as the
380            # number of concurrent tasks drops below the number allowed.
381            done, pending_tasks = await asyncio.wait(
382                pending_tasks, return_when=asyncio.FIRST_COMPLETED
383            )
384            for task in done:
385                try:
386                    base_schema = merge_schemas(base_schema, task.result())
387                except AirbyteTracedException as ate:
388                    raise ate
389                except Exception as exc:
390                    self.logger.error(
391                        f"An error occurred inferring the schema. \n {traceback.format_exc()}",
392                        exc_info=exc,
393                    )
394
395        return base_schema
396
397    async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:  # type: ignore
398        try:
399            return await self.get_parser().infer_schema(
400                self.config, file, self.stream_reader, self.logger
401            )
402        except EmptyFileSchemaInferenceError as exc:
403            self._raise_schema_inference_error(exc, file)
404        except AirbyteTracedException as ate:
405            raise ate
406        except Exception as exc:
407            self._raise_schema_inference_error(exc, file)
408
409    def _raise_schema_inference_error(
410        self, exc: Exception, file: Optional[RemoteFile] = None
411    ) -> NoReturn:
412        raise SchemaInferenceError(
413            FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
414            file=file.uri if file else None,
415            format=str(self.config.format) if self.config.format else None,
416            stream=self.name,
417        ) from exc

The default file-based stream.

DefaultFileBasedStream(**kwargs: Any)
75    def __init__(self, **kwargs: Any):
76        if self.FILE_TRANSFER_KW in kwargs:
77            self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False)
78        if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs:
79            self.preserve_directory_structure = kwargs.pop(
80                self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
81            )
82        super().__init__(**kwargs)
FILE_TRANSFER_KW = 'use_file_transfer'
PRESERVE_DIRECTORY_STRUCTURE_KW = 'preserve_directory_structure'
FILES_KEY = 'files'
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
ab_last_mod_col = '_ab_source_file_last_modified'
ab_file_name_col = '_ab_source_file_url'
modified = 'modified'
source_file_url = 'source_file_url'
airbyte_columns = ['_ab_source_file_last_modified', '_ab_source_file_url']
use_file_transfer = False
preserve_directory_structure = True
state: MutableMapping[str, Any]
84    @property
85    def state(self) -> MutableMapping[str, Any]:
86        return self._cursor.get_state()

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

93    @property  # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors
94    def cursor(self) -> Optional[AbstractFileBasedCursor]:
95        return self._cursor
primary_key: Union[str, List[str], NoneType]
105    @property
106    def primary_key(self) -> PrimaryKeyType:
107        return self.config.primary_key or self.get_parser().get_parser_defined_primary_key(
108            self.config
109        )
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
123    def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
124        # Sort files by last_modified, uri and return them grouped by last_modified
125        all_files = self.list_files()
126        files_to_read = self._cursor.get_files_to_sync(all_files, self.logger)
127        sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri))
128        slices = [
129            {self.FILES_KEY: list(group[1])}
130            for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)
131        ]
132        if slices and not self.preserve_directory_structure:
133            duplicated_files_names = self._duplicated_files_names(slices)
134            if duplicated_files_names:
135                raise DuplicatedFilesError(
136                    stream=self.name, duplicated_files_names=duplicated_files_names
137                )
138        return slices

Return a list of slices that will be used to read files in the current sync.

Returns

The slices to use for the current sync.

def transform_record( self, record: dict[str, typing.Any], file: airbyte_cdk.sources.file_based.RemoteFile, last_updated: str) -> dict[str, typing.Any]:
140    def transform_record(
141        self, record: dict[str, Any], file: RemoteFile, last_updated: str
142    ) -> dict[str, Any]:
143        # adds _ab_source_file_last_modified and _ab_source_file_url to the record
144        record[self.ab_last_mod_col] = last_updated
145        record[self.ab_file_name_col] = file.uri
146        return record
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[airbyte_cdk.AirbyteMessage]:
148    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
149        """
150        Yield all records from all remote files in `list_files_for_this_sync`.
151
152        If an error is encountered reading records from a file, log a message and do not attempt
153        to sync the rest of the file.
154        """
155        schema = self.catalog_schema
156        if schema is None:
157            # On read requests we should always have the catalog available
158            raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name)
159        # The stream only supports a single file type, so we can use the same parser for all files
160        parser = self.get_parser()
161        for file in stream_slice["files"]:
162            # only serialize the datetime once
163            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
164            n_skipped = line_no = 0
165
166            try:
167                if self.use_file_transfer:
168                    for file_record_data, file_reference in self._file_transfer.upload(
169                        file=file, stream_reader=self.stream_reader, logger=self.logger
170                    ):
171                        yield stream_data_to_airbyte_message(
172                            self.name,
173                            file_record_data.dict(exclude_none=True),
174                            file_reference=file_reference,
175                        )
176                else:
177                    for record in parser.parse_records(
178                        self.config, file, self.stream_reader, self.logger, schema
179                    ):
180                        line_no += 1
181                        if self.config.schemaless:
182                            record = {"data": record}
183                        elif not self.record_passes_validation_policy(record):
184                            n_skipped += 1
185                            continue
186                        record = self.transform_record(record, file, file_datetime_string)
187                        yield stream_data_to_airbyte_message(self.name, record)
188                self._cursor.add_file(file)
189
190            except StopSyncPerValidationPolicy:
191                yield AirbyteMessage(
192                    type=MessageType.LOG,
193                    log=AirbyteLogMessage(
194                        level=Level.WARN,
195                        message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}",
196                    ),
197                )
198                break
199
200            except RecordParseError:
201                # Increment line_no because the exception was raised before we could increment it
202                line_no += 1
203                self.errors_collector.collect(
204                    AirbyteMessage(
205                        type=MessageType.LOG,
206                        log=AirbyteLogMessage(
207                            level=Level.ERROR,
208                            message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
209                            stack_trace=traceback.format_exc(),
210                        ),
211                    ),
212                )
213
214            except AirbyteTracedException as exc:
215                # Re-raise the exception to stop the whole sync immediately as this is a fatal error
216                raise exc
217
218            except Exception:
219                yield AirbyteMessage(
220                    type=MessageType.LOG,
221                    log=AirbyteLogMessage(
222                        level=Level.ERROR,
223                        message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
224                        stack_trace=traceback.format_exc(),
225                    ),
226                )
227
228            finally:
229                if n_skipped:
230                    yield AirbyteMessage(
231                        type=MessageType.LOG,
232                        log=AirbyteLogMessage(
233                            level=Level.WARN,
234                            message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}",
235                        ),
236                    )

Yield all records from all remote files in list_files_for_this_sync.

If an error is encountered reading records from a file, log a message and do not attempt to sync the rest of the file.

cursor_field: Union[str, List[str]]
238    @property
239    def cursor_field(self) -> Union[str, List[str]]:
240        """
241        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
242        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
243        """
244        return self.ab_last_mod_col

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

@cache
def get_json_schema(self) -> Mapping[str, Any]:
246    @cache
247    def get_json_schema(self) -> JsonSchema:  # type: ignore
248        if self.use_file_transfer:
249            return file_transfer_schema
250        extra_fields = {
251            self.ab_last_mod_col: {"type": "string"},
252            self.ab_file_name_col: {"type": "string"},
253        }
254        try:
255            schema = self._get_raw_json_schema()
256        except InvalidSchemaError as config_exception:
257            raise AirbyteTracedException(
258                internal_message="Please check the logged errors for more information.",
259                message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
260                exception=AirbyteTracedException(exception=config_exception),
261                failure_type=FailureType.config_error,
262            )
263        except EmptyFileSchemaInferenceError as exc:
264            self._raise_schema_inference_error(exc)
265        except AirbyteTracedException as ate:
266            raise ate
267        except Exception as exc:
268            self._raise_schema_inference_error(exc)
269        else:
270            return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}

Return the JSON Schema for a stream.

def get_files(self) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
322    def get_files(self) -> Iterable[RemoteFile]:
323        """
324        Return all files that belong to the stream as defined by the stream's globs.
325        """
326        return self.stream_reader.get_matching_files(
327            self.config.globs or [], self.config.legacy_prefix, self.logger
328        )

Return all files that belong to the stream as defined by the stream's globs.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
330    def as_airbyte_stream(self) -> AirbyteStream:
331        file_stream = super().as_airbyte_stream()
332        file_stream.is_file_based = self.use_file_transfer
333        return file_stream
def infer_schema( self, files: List[airbyte_cdk.sources.file_based.RemoteFile]) -> Mapping[str, Any]:
335    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
336        loop = asyncio.get_event_loop()
337        schema = loop.run_until_complete(self._infer_schema(files))
338        # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
339        return self._fill_nulls(deepcopy(schema))

Infer the schema for files in the stream.

19class FileIdentitiesStream(IdentitiesStream):
20    """
21    The identities stream. A full refresh stream to sync identities from a certain domain.
22    The stream reader manage the logic to get such data, which is implemented on connector side.
23    """
24
25    is_resumable = False
26
27    def __init__(
28        self,
29        catalog_schema: Optional[Mapping[str, Any]],
30        stream_permissions_reader: AbstractFileBasedStreamPermissionsReader,
31        discovery_policy: AbstractDiscoveryPolicy,
32        errors_collector: FileBasedErrorsCollector,
33    ) -> None:
34        super().__init__()
35        self.catalog_schema = catalog_schema
36        self.stream_permissions_reader = stream_permissions_reader
37        self._discovery_policy = discovery_policy
38        self.errors_collector = errors_collector
39        self._cursor: MutableMapping[str, Any] = {}
40
41    @property
42    def primary_key(self) -> PrimaryKeyType:
43        return None
44
45    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
46        return self.stream_permissions_reader.load_identity_groups(logger=self.logger)
47
48    @cache
49    def get_json_schema(self) -> JsonSchema:
50        return self.stream_permissions_reader.identities_schema

The identities stream. A full refresh stream to sync identities from a certain domain. The stream reader manage the logic to get such data, which is implemented on connector side.

27    def __init__(
28        self,
29        catalog_schema: Optional[Mapping[str, Any]],
30        stream_permissions_reader: AbstractFileBasedStreamPermissionsReader,
31        discovery_policy: AbstractDiscoveryPolicy,
32        errors_collector: FileBasedErrorsCollector,
33    ) -> None:
34        super().__init__()
35        self.catalog_schema = catalog_schema
36        self.stream_permissions_reader = stream_permissions_reader
37        self._discovery_policy = discovery_policy
38        self.errors_collector = errors_collector
39        self._cursor: MutableMapping[str, Any] = {}
is_resumable = False
Returns

True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.

catalog_schema
stream_permissions_reader
errors_collector
primary_key: Union[str, List[str], NoneType]
41    @property
42    def primary_key(self) -> PrimaryKeyType:
43        return None
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
45    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
46        return self.stream_permissions_reader.load_identity_groups(logger=self.logger)
@cache
def get_json_schema(self) -> Mapping[str, Any]:
48    @cache
49    def get_json_schema(self) -> JsonSchema:
50        return self.stream_permissions_reader.identities_schema
Returns

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.

class PermissionsFileBasedStream(airbyte_cdk.sources.file_based.stream.DefaultFileBasedStream):
20class PermissionsFileBasedStream(DefaultFileBasedStream):
21    """
22    A specialized stream for handling file-based ACL permissions.
23
24    This stream works with the stream_reader to:
25    1. Fetch ACL permissions for each file in the source
26    2. Transform permissions into a standardized format
27    3. Generate records containing permission information
28
29    The stream_reader is responsible for the actual implementation of permission retrieval
30    and schema definition, while this class handles the streaming interface.
31    """
32
33    def __init__(
34        self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any
35    ):
36        super().__init__(**kwargs)
37        self.stream_permissions_reader = stream_permissions_reader
38
39    def _filter_schema_invalid_properties(
40        self, configured_catalog_json_schema: Dict[str, Any]
41    ) -> Dict[str, Any]:
42        return self.stream_permissions_reader.file_permissions_schema
43
44    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
45        """
46        Yield permissions records from all remote files
47        """
48
49        for file in stream_slice["files"]:
50            no_permissions = False
51            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
52            try:
53                permissions_record = self.stream_permissions_reader.get_file_acl_permissions(
54                    file, logger=self.logger
55                )
56                if not permissions_record:
57                    no_permissions = True
58                    self.logger.warning(
59                        f"Unable to fetch permissions. stream={self.name} file={file.uri}"
60                    )
61                    continue
62                permissions_record = self.transform_record(
63                    permissions_record, file, file_datetime_string
64                )
65                yield stream_data_to_airbyte_message(self.name, permissions_record)
66            except Exception as e:
67                self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}")
68                yield AirbyteMessage(
69                    type=MessageType.LOG,
70                    log=AirbyteLogMessage(
71                        level=Level.ERROR,
72                        message=f"Error retrieving files permissions: stream={self.name} file={file.uri}",
73                        stack_trace=traceback.format_exc(),
74                    ),
75                )
76            finally:
77                if no_permissions:
78                    yield AirbyteMessage(
79                        type=MessageType.LOG,
80                        log=AirbyteLogMessage(
81                            level=Level.WARN,
82                            message=f"Unable to fetch permissions. stream={self.name} file={file.uri}",
83                        ),
84                    )
85
86    def _get_raw_json_schema(self) -> JsonSchema:
87        """
88        Retrieve the raw JSON schema for file permissions from the stream reader.
89
90        Returns:
91           The file permissions schema that defines the structure of permission records
92        """
93        return self.stream_permissions_reader.file_permissions_schema

A specialized stream for handling file-based ACL permissions.

This stream works with the stream_reader to:

  1. Fetch ACL permissions for each file in the source
  2. Transform permissions into a standardized format
  3. Generate records containing permission information

The stream_reader is responsible for the actual implementation of permission retrieval and schema definition, while this class handles the streaming interface.

PermissionsFileBasedStream( stream_permissions_reader: airbyte_cdk.sources.file_based.file_based_stream_permissions_reader.AbstractFileBasedStreamPermissionsReader, **kwargs: Any)
33    def __init__(
34        self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any
35    ):
36        super().__init__(**kwargs)
37        self.stream_permissions_reader = stream_permissions_reader
stream_permissions_reader
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[airbyte_cdk.AirbyteMessage]:
44    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
45        """
46        Yield permissions records from all remote files
47        """
48
49        for file in stream_slice["files"]:
50            no_permissions = False
51            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
52            try:
53                permissions_record = self.stream_permissions_reader.get_file_acl_permissions(
54                    file, logger=self.logger
55                )
56                if not permissions_record:
57                    no_permissions = True
58                    self.logger.warning(
59                        f"Unable to fetch permissions. stream={self.name} file={file.uri}"
60                    )
61                    continue
62                permissions_record = self.transform_record(
63                    permissions_record, file, file_datetime_string
64                )
65                yield stream_data_to_airbyte_message(self.name, permissions_record)
66            except Exception as e:
67                self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}")
68                yield AirbyteMessage(
69                    type=MessageType.LOG,
70                    log=AirbyteLogMessage(
71                        level=Level.ERROR,
72                        message=f"Error retrieving files permissions: stream={self.name} file={file.uri}",
73                        stack_trace=traceback.format_exc(),
74                    ),
75                )
76            finally:
77                if no_permissions:
78                    yield AirbyteMessage(
79                        type=MessageType.LOG,
80                        log=AirbyteLogMessage(
81                            level=Level.WARN,
82                            message=f"Unable to fetch permissions. stream={self.name} file={file.uri}",
83                        ),
84                    )

Yield permissions records from all remote files