airbyte_cdk.sources.file_based.stream

 1from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream
 2from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream
 3from airbyte_cdk.sources.file_based.stream.identities_stream import FileIdentitiesStream
 4from airbyte_cdk.sources.file_based.stream.permissions_file_based_stream import (
 5    PermissionsFileBasedStream,
 6)
 7
 8__all__ = [
 9    "AbstractFileBasedStream",
10    "DefaultFileBasedStream",
11    "FileIdentitiesStream",
12    "PermissionsFileBasedStream",
13]
class AbstractFileBasedStream(airbyte_cdk.sources.streams.core.Stream):
 38class AbstractFileBasedStream(Stream):
 39    """
 40    A file-based stream in an Airbyte source.
 41
 42    In addition to the base Stream attributes, a file-based stream has
 43    - A config object (derived from the corresponding stream section in source config).
 44      This contains the globs defining the stream's files.
 45    - A StreamReader, which knows how to list and open files in the stream.
 46    - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open
 47      files in the stream.
 48    - A DiscoveryPolicy that controls the number of concurrent requests sent to the source
 49      during discover, and the number of files used for schema discovery.
 50    - A dictionary of FileType:Parser that holds all the file types that can be handled
 51      by the stream.
 52    """
 53
 54    def __init__(
 55        self,
 56        config: FileBasedStreamConfig,
 57        catalog_schema: Optional[Mapping[str, Any]],
 58        stream_reader: AbstractFileBasedStreamReader,
 59        availability_strategy: AbstractFileBasedAvailabilityStrategy,
 60        discovery_policy: AbstractDiscoveryPolicy,
 61        parsers: Dict[Type[Any], FileTypeParser],
 62        validation_policy: AbstractSchemaValidationPolicy,
 63        errors_collector: FileBasedErrorsCollector,
 64        cursor: AbstractFileBasedCursor,
 65    ):
 66        super().__init__()
 67        self.config = config
 68        self.catalog_schema = catalog_schema
 69        self.validation_policy = validation_policy
 70        self.stream_reader = stream_reader
 71        self._discovery_policy = discovery_policy
 72        self._availability_strategy = availability_strategy
 73        self._parsers = parsers
 74        self.errors_collector = errors_collector
 75        self._cursor = cursor
 76
 77    @property
 78    @abstractmethod
 79    def primary_key(self) -> PrimaryKeyType: ...
 80
 81    @cache
 82    def list_files(self) -> List[RemoteFile]:
 83        """
 84        List all files that belong to the stream.
 85
 86        The output of this method is cached so we don't need to list the files more than once.
 87        This means we won't pick up changes to the files during a sync. This method uses the
 88        get_files method which is implemented by the concrete stream class.
 89        """
 90        return list(self.get_files())
 91
 92    @abstractmethod
 93    def get_files(self) -> Iterable[RemoteFile]:
 94        """
 95        List all files that belong to the stream as defined by the stream's globs.
 96        """
 97        ...
 98
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)
114
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...
123
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()
136
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...
144
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...
152
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...
159
160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166                stream=self.name,
167                format=type(self.config.format),
168            )
169
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178                stream=self.name,
179                validation_policy=self.config.validation_policy,
180            )
181
182    @cached_property
183    @deprecated("Deprecated as of CDK version 3.7.0.")
184    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
185        return self._availability_strategy
186
187    @property
188    def name(self) -> str:
189        return self.config.name
190
191    def get_cursor(self) -> Optional[Cursor]:
192        """
193        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
194        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
195        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
196        then this override can be removed.
197        """
198        return None

A file-based stream in an Airbyte source.

In addition to the base Stream attributes, a file-based stream has

  • A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
  • A StreamReader, which knows how to list and open files in the stream.
  • A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
  • A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
  • A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
config
catalog_schema
validation_policy
stream_reader
errors_collector
primary_key: Union[str, List[str], NoneType]
77    @property
78    @abstractmethod
79    def primary_key(self) -> PrimaryKeyType: ...
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

@cache
def list_files(self) -> List[airbyte_cdk.sources.file_based.RemoteFile]:
81    @cache
82    def list_files(self) -> List[RemoteFile]:
83        """
84        List all files that belong to the stream.
85
86        The output of this method is cached so we don't need to list the files more than once.
87        This means we won't pick up changes to the files during a sync. This method uses the
88        get_files method which is implemented by the concrete stream class.
89        """
90        return list(self.get_files())

List all files that belong to the stream.

The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.

@abstractmethod
def get_files(self) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
92    @abstractmethod
93    def get_files(self) -> Iterable[RemoteFile]:
94        """
95        List all files that belong to the stream as defined by the stream's globs.
96        """
97        ...

List all files that belong to the stream as defined by the stream's globs.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)

Yield all records from all remote files in list_files_for_this_sync. This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

@abstractmethod
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...

Yield all records from all remote files in list_files_for_this_sync.

def stream_slices( self, *, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()

This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

@abstractmethod
def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...

Return a list of slices that will be used to read files in the current sync.

Returns

The slices to use for the current sync.

@abstractmethod
@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...

Return the JSON Schema for a stream.

@abstractmethod
def infer_schema( self, files: List[airbyte_cdk.sources.file_based.RemoteFile]) -> Mapping[str, Any]:
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...

Infer the schema for files in the stream.

160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166                stream=self.name,
167                format=type(self.config.format),
168            )
def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178                stream=self.name,
179                validation_policy=self.config.validation_policy,
180            )
182    @cached_property
183    @deprecated("Deprecated as of CDK version 3.7.0.")
184    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
185        return self._availability_strategy
name: str
187    @property
188    def name(self) -> str:
189        return self.config.name
Returns

Stream name. By default this is the implementing class name, but it can be overridden as needed.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
191    def get_cursor(self) -> Optional[Cursor]:
192        """
193        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
194        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
195        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
196        then this override can be removed.
197        """
198        return None

This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.

 44class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
 45    """
 46    The default file-based stream.
 47    """
 48
 49    FILE_TRANSFER_KW = "use_file_transfer"
 50    PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure"
 51    FILES_KEY = "files"
 52    DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
 53    ab_last_mod_col = "_ab_source_file_last_modified"
 54    ab_file_name_col = "_ab_source_file_url"
 55    modified = "modified"
 56    source_file_url = "source_file_url"
 57    airbyte_columns = [ab_last_mod_col, ab_file_name_col]
 58    use_file_transfer = False
 59    preserve_directory_structure = True
 60    _file_transfer = FileTransfer()
 61
 62    def __init__(self, **kwargs: Any):
 63        if self.FILE_TRANSFER_KW in kwargs:
 64            self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False)
 65        if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs:
 66            self.preserve_directory_structure = kwargs.pop(
 67                self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
 68            )
 69        super().__init__(**kwargs)
 70
 71    @property
 72    def state(self) -> MutableMapping[str, Any]:
 73        return self._cursor.get_state()
 74
 75    @state.setter
 76    def state(self, value: MutableMapping[str, Any]) -> None:
 77        """State setter, accept state serialized by state getter."""
 78        self._cursor.set_initial_state(value)
 79
 80    @property  # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors
 81    def cursor(self) -> Optional[AbstractFileBasedCursor]:
 82        return self._cursor
 83
 84    @cursor.setter
 85    def cursor(self, value: AbstractFileBasedCursor) -> None:
 86        if self._cursor is not None:
 87            raise RuntimeError(
 88                f"Cursor for stream {self.name} is already set. This is unexpected. Please contact Support."
 89            )
 90        self._cursor = value
 91
 92    @property
 93    def primary_key(self) -> PrimaryKeyType:
 94        return self.config.primary_key or self.get_parser().get_parser_defined_primary_key(
 95            self.config
 96        )
 97
 98    def _duplicated_files_names(
 99        self, slices: List[dict[str, List[RemoteFile]]]
100    ) -> List[dict[str, List[str]]]:
101        seen_file_names: Dict[str, List[str]] = defaultdict(list)
102        for file_slice in slices:
103            for file_found in file_slice[self.FILES_KEY]:
104                file_name = path.basename(file_found.uri)
105                seen_file_names[file_name].append(file_found.uri)
106        return [
107            {file_name: paths} for file_name, paths in seen_file_names.items() if len(paths) > 1
108        ]
109
110    def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
111        # Sort files by last_modified, uri and return them grouped by last_modified
112        all_files = self.list_files()
113        files_to_read = self._cursor.get_files_to_sync(all_files, self.logger)
114        sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri))
115        slices = [
116            {self.FILES_KEY: list(group[1])}
117            for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)
118        ]
119        if slices and not self.preserve_directory_structure:
120            duplicated_files_names = self._duplicated_files_names(slices)
121            if duplicated_files_names:
122                raise DuplicatedFilesError(
123                    stream=self.name, duplicated_files_names=duplicated_files_names
124                )
125        return slices
126
127    def transform_record(
128        self, record: dict[str, Any], file: RemoteFile, last_updated: str
129    ) -> dict[str, Any]:
130        # adds _ab_source_file_last_modified and _ab_source_file_url to the record
131        record[self.ab_last_mod_col] = last_updated
132        record[self.ab_file_name_col] = file.uri
133        return record
134
135    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
136        """
137        Yield all records from all remote files in `list_files_for_this_sync`.
138
139        If an error is encountered reading records from a file, log a message and do not attempt
140        to sync the rest of the file.
141        """
142        schema = self.catalog_schema
143        if schema is None:
144            # On read requests we should always have the catalog available
145            raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name)
146        # The stream only supports a single file type, so we can use the same parser for all files
147        parser = self.get_parser()
148        for file in stream_slice["files"]:
149            # only serialize the datetime once
150            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
151            n_skipped = line_no = 0
152
153            try:
154                if self.use_file_transfer:
155                    for file_record_data, file_reference in self._file_transfer.upload(
156                        file=file, stream_reader=self.stream_reader, logger=self.logger
157                    ):
158                        yield stream_data_to_airbyte_message(
159                            self.name,
160                            file_record_data.dict(exclude_none=True),
161                            file_reference=file_reference,
162                        )
163                else:
164                    for record in parser.parse_records(
165                        self.config, file, self.stream_reader, self.logger, schema
166                    ):
167                        line_no += 1
168                        if self.config.schemaless:
169                            record = {"data": record}
170                        elif not self.record_passes_validation_policy(record):
171                            n_skipped += 1
172                            continue
173                        record = self.transform_record(record, file, file_datetime_string)
174                        yield stream_data_to_airbyte_message(self.name, record)
175                self._cursor.add_file(file)
176
177            except StopSyncPerValidationPolicy:
178                yield AirbyteMessage(
179                    type=MessageType.LOG,
180                    log=AirbyteLogMessage(
181                        level=Level.WARN,
182                        message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}",
183                    ),
184                )
185                break
186
187            except RecordParseError:
188                # Increment line_no because the exception was raised before we could increment it
189                line_no += 1
190                self.errors_collector.collect(
191                    AirbyteMessage(
192                        type=MessageType.LOG,
193                        log=AirbyteLogMessage(
194                            level=Level.ERROR,
195                            message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
196                            stack_trace=traceback.format_exc(),
197                        ),
198                    ),
199                )
200
201            except AirbyteTracedException as exc:
202                # Re-raise the exception to stop the whole sync immediately as this is a fatal error
203                raise exc
204
205            except Exception:
206                yield AirbyteMessage(
207                    type=MessageType.LOG,
208                    log=AirbyteLogMessage(
209                        level=Level.ERROR,
210                        message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
211                        stack_trace=traceback.format_exc(),
212                    ),
213                )
214
215            finally:
216                if n_skipped:
217                    yield AirbyteMessage(
218                        type=MessageType.LOG,
219                        log=AirbyteLogMessage(
220                            level=Level.WARN,
221                            message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}",
222                        ),
223                    )
224
225    @property
226    def cursor_field(self) -> Union[str, List[str]]:
227        """
228        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
229        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
230        """
231        return self.ab_last_mod_col
232
233    @cache
234    def get_json_schema(self) -> JsonSchema:
235        if self.use_file_transfer:
236            return file_transfer_schema
237        extra_fields = {
238            self.ab_last_mod_col: {"type": "string"},
239            self.ab_file_name_col: {"type": "string"},
240        }
241        try:
242            schema = self._get_raw_json_schema()
243        except InvalidSchemaError as config_exception:
244            raise AirbyteTracedException(
245                internal_message="Please check the logged errors for more information.",
246                message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
247                exception=AirbyteTracedException(exception=config_exception),
248                failure_type=FailureType.config_error,
249            )
250        except AirbyteTracedException as ate:
251            raise ate
252        except Exception as exc:
253            raise SchemaInferenceError(
254                FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
255            ) from exc
256        else:
257            return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
258
259    def _get_raw_json_schema(self) -> JsonSchema:
260        if self.config.input_schema:
261            return self.config.get_input_schema()  # type: ignore
262        elif self.config.schemaless:
263            return schemaless_schema
264        else:
265            files = self.list_files()
266            first_n_files = len(files)
267
268            if self.config.recent_n_files_to_read_for_schema_discovery:
269                self.logger.info(
270                    msg=(
271                        f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema "
272                        f"for stream {self.name} due to limitation in config."
273                    )
274                )
275                first_n_files = self.config.recent_n_files_to_read_for_schema_discovery
276
277        if first_n_files == 0:
278            self.logger.warning(
279                msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream."
280            )
281            return schemaless_schema
282
283        max_n_files_for_schema_inference = (
284            self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())
285        )
286
287        if first_n_files > max_n_files_for_schema_inference:
288            # Use the most recent files for schema inference, so we pick up schema changes during discovery.
289            self.logger.warning(
290                msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files."
291            )
292            first_n_files = max_n_files_for_schema_inference
293
294        files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files]
295
296        inferred_schema = self.infer_schema(files)
297
298        if not inferred_schema:
299            raise InvalidSchemaError(
300                FileBasedSourceError.INVALID_SCHEMA_ERROR,
301                details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
302                stream=self.name,
303            )
304
305        schema = {"type": "object", "properties": inferred_schema}
306
307        return schema
308
309    def get_files(self) -> Iterable[RemoteFile]:
310        """
311        Return all files that belong to the stream as defined by the stream's globs.
312        """
313        return self.stream_reader.get_matching_files(
314            self.config.globs or [], self.config.legacy_prefix, self.logger
315        )
316
317    def as_airbyte_stream(self) -> AirbyteStream:
318        file_stream = super().as_airbyte_stream()
319        file_stream.is_file_based = self.use_file_transfer
320        return file_stream
321
322    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
323        loop = asyncio.get_event_loop()
324        schema = loop.run_until_complete(self._infer_schema(files))
325        # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
326        return self._fill_nulls(deepcopy(schema))
327
328    @staticmethod
329    def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]:
330        if isinstance(schema, dict):
331            for k, v in schema.items():
332                if k == "type":
333                    if isinstance(v, list):
334                        if "null" not in v:
335                            schema[k] = ["null"] + v
336                    elif v != "null":
337                        if isinstance(v, (str, list)):
338                            schema[k] = ["null", v]
339                        else:
340                            DefaultFileBasedStream._fill_nulls(v)
341                else:
342                    DefaultFileBasedStream._fill_nulls(v)
343        elif isinstance(schema, list):
344            for item in schema:
345                DefaultFileBasedStream._fill_nulls(item)
346        return schema
347
348    async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
349        """
350        Infer the schema for a stream.
351
352        Each file type has a corresponding `infer_schema` handler.
353        Dispatch on file type.
354        """
355        base_schema: SchemaType = {}
356        pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set()
357
358        n_started, n_files = 0, len(files)
359        files_iterator = iter(files)
360        while pending_tasks or n_started < n_files:
361            while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and (
362                file := next(files_iterator, None)
363            ):
364                pending_tasks.add(asyncio.create_task(self._infer_file_schema(file)))
365                n_started += 1
366            # Return when the first task is completed so that we can enqueue a new task as soon as the
367            # number of concurrent tasks drops below the number allowed.
368            done, pending_tasks = await asyncio.wait(
369                pending_tasks, return_when=asyncio.FIRST_COMPLETED
370            )
371            for task in done:
372                try:
373                    base_schema = merge_schemas(base_schema, task.result())
374                except AirbyteTracedException as ate:
375                    raise ate
376                except Exception as exc:
377                    self.logger.error(
378                        f"An error occurred inferring the schema. \n {traceback.format_exc()}",
379                        exc_info=exc,
380                    )
381
382        return base_schema
383
384    async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
385        try:
386            return await self.get_parser().infer_schema(
387                self.config, file, self.stream_reader, self.logger
388            )
389        except AirbyteTracedException as ate:
390            raise ate
391        except Exception as exc:
392            raise SchemaInferenceError(
393                FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
394                file=file.uri,
395                format=str(self.config.format),
396                stream=self.name,
397            ) from exc

The default file-based stream.

DefaultFileBasedStream(**kwargs: Any)
62    def __init__(self, **kwargs: Any):
63        if self.FILE_TRANSFER_KW in kwargs:
64            self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False)
65        if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs:
66            self.preserve_directory_structure = kwargs.pop(
67                self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
68            )
69        super().__init__(**kwargs)
FILE_TRANSFER_KW = 'use_file_transfer'
PRESERVE_DIRECTORY_STRUCTURE_KW = 'preserve_directory_structure'
FILES_KEY = 'files'
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
ab_last_mod_col = '_ab_source_file_last_modified'
ab_file_name_col = '_ab_source_file_url'
modified = 'modified'
source_file_url = 'source_file_url'
airbyte_columns = ['_ab_source_file_last_modified', '_ab_source_file_url']
use_file_transfer = False
preserve_directory_structure = True
state: MutableMapping[str, Any]
71    @property
72    def state(self) -> MutableMapping[str, Any]:
73        return self._cursor.get_state()

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

80    @property  # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors
81    def cursor(self) -> Optional[AbstractFileBasedCursor]:
82        return self._cursor
primary_key: Union[str, List[str], NoneType]
92    @property
93    def primary_key(self) -> PrimaryKeyType:
94        return self.config.primary_key or self.get_parser().get_parser_defined_primary_key(
95            self.config
96        )
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
110    def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
111        # Sort files by last_modified, uri and return them grouped by last_modified
112        all_files = self.list_files()
113        files_to_read = self._cursor.get_files_to_sync(all_files, self.logger)
114        sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri))
115        slices = [
116            {self.FILES_KEY: list(group[1])}
117            for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)
118        ]
119        if slices and not self.preserve_directory_structure:
120            duplicated_files_names = self._duplicated_files_names(slices)
121            if duplicated_files_names:
122                raise DuplicatedFilesError(
123                    stream=self.name, duplicated_files_names=duplicated_files_names
124                )
125        return slices

Return a list of slices that will be used to read files in the current sync.

Returns

The slices to use for the current sync.

def transform_record( self, record: dict[str, typing.Any], file: airbyte_cdk.sources.file_based.RemoteFile, last_updated: str) -> dict[str, typing.Any]:
127    def transform_record(
128        self, record: dict[str, Any], file: RemoteFile, last_updated: str
129    ) -> dict[str, Any]:
130        # adds _ab_source_file_last_modified and _ab_source_file_url to the record
131        record[self.ab_last_mod_col] = last_updated
132        record[self.ab_file_name_col] = file.uri
133        return record
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[airbyte_cdk.AirbyteMessage]:
135    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
136        """
137        Yield all records from all remote files in `list_files_for_this_sync`.
138
139        If an error is encountered reading records from a file, log a message and do not attempt
140        to sync the rest of the file.
141        """
142        schema = self.catalog_schema
143        if schema is None:
144            # On read requests we should always have the catalog available
145            raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name)
146        # The stream only supports a single file type, so we can use the same parser for all files
147        parser = self.get_parser()
148        for file in stream_slice["files"]:
149            # only serialize the datetime once
150            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
151            n_skipped = line_no = 0
152
153            try:
154                if self.use_file_transfer:
155                    for file_record_data, file_reference in self._file_transfer.upload(
156                        file=file, stream_reader=self.stream_reader, logger=self.logger
157                    ):
158                        yield stream_data_to_airbyte_message(
159                            self.name,
160                            file_record_data.dict(exclude_none=True),
161                            file_reference=file_reference,
162                        )
163                else:
164                    for record in parser.parse_records(
165                        self.config, file, self.stream_reader, self.logger, schema
166                    ):
167                        line_no += 1
168                        if self.config.schemaless:
169                            record = {"data": record}
170                        elif not self.record_passes_validation_policy(record):
171                            n_skipped += 1
172                            continue
173                        record = self.transform_record(record, file, file_datetime_string)
174                        yield stream_data_to_airbyte_message(self.name, record)
175                self._cursor.add_file(file)
176
177            except StopSyncPerValidationPolicy:
178                yield AirbyteMessage(
179                    type=MessageType.LOG,
180                    log=AirbyteLogMessage(
181                        level=Level.WARN,
182                        message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}",
183                    ),
184                )
185                break
186
187            except RecordParseError:
188                # Increment line_no because the exception was raised before we could increment it
189                line_no += 1
190                self.errors_collector.collect(
191                    AirbyteMessage(
192                        type=MessageType.LOG,
193                        log=AirbyteLogMessage(
194                            level=Level.ERROR,
195                            message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
196                            stack_trace=traceback.format_exc(),
197                        ),
198                    ),
199                )
200
201            except AirbyteTracedException as exc:
202                # Re-raise the exception to stop the whole sync immediately as this is a fatal error
203                raise exc
204
205            except Exception:
206                yield AirbyteMessage(
207                    type=MessageType.LOG,
208                    log=AirbyteLogMessage(
209                        level=Level.ERROR,
210                        message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
211                        stack_trace=traceback.format_exc(),
212                    ),
213                )
214
215            finally:
216                if n_skipped:
217                    yield AirbyteMessage(
218                        type=MessageType.LOG,
219                        log=AirbyteLogMessage(
220                            level=Level.WARN,
221                            message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}",
222                        ),
223                    )

Yield all records from all remote files in list_files_for_this_sync.

If an error is encountered reading records from a file, log a message and do not attempt to sync the rest of the file.

cursor_field: Union[str, List[str]]
225    @property
226    def cursor_field(self) -> Union[str, List[str]]:
227        """
228        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
229        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
230        """
231        return self.ab_last_mod_col

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

@cache
def get_json_schema(self) -> Mapping[str, Any]:
233    @cache
234    def get_json_schema(self) -> JsonSchema:
235        if self.use_file_transfer:
236            return file_transfer_schema
237        extra_fields = {
238            self.ab_last_mod_col: {"type": "string"},
239            self.ab_file_name_col: {"type": "string"},
240        }
241        try:
242            schema = self._get_raw_json_schema()
243        except InvalidSchemaError as config_exception:
244            raise AirbyteTracedException(
245                internal_message="Please check the logged errors for more information.",
246                message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
247                exception=AirbyteTracedException(exception=config_exception),
248                failure_type=FailureType.config_error,
249            )
250        except AirbyteTracedException as ate:
251            raise ate
252        except Exception as exc:
253            raise SchemaInferenceError(
254                FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
255            ) from exc
256        else:
257            return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}

Return the JSON Schema for a stream.

def get_files(self) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
309    def get_files(self) -> Iterable[RemoteFile]:
310        """
311        Return all files that belong to the stream as defined by the stream's globs.
312        """
313        return self.stream_reader.get_matching_files(
314            self.config.globs or [], self.config.legacy_prefix, self.logger
315        )

Return all files that belong to the stream as defined by the stream's globs.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
317    def as_airbyte_stream(self) -> AirbyteStream:
318        file_stream = super().as_airbyte_stream()
319        file_stream.is_file_based = self.use_file_transfer
320        return file_stream
def infer_schema( self, files: List[airbyte_cdk.sources.file_based.RemoteFile]) -> Mapping[str, Any]:
322    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
323        loop = asyncio.get_event_loop()
324        schema = loop.run_until_complete(self._infer_schema(files))
325        # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
326        return self._fill_nulls(deepcopy(schema))

Infer the schema for files in the stream.

19class FileIdentitiesStream(IdentitiesStream):
20    """
21    The identities stream. A full refresh stream to sync identities from a certain domain.
22    The stream reader manage the logic to get such data, which is implemented on connector side.
23    """
24
25    is_resumable = False
26
27    def __init__(
28        self,
29        catalog_schema: Optional[Mapping[str, Any]],
30        stream_permissions_reader: AbstractFileBasedStreamPermissionsReader,
31        discovery_policy: AbstractDiscoveryPolicy,
32        errors_collector: FileBasedErrorsCollector,
33    ) -> None:
34        super().__init__()
35        self.catalog_schema = catalog_schema
36        self.stream_permissions_reader = stream_permissions_reader
37        self._discovery_policy = discovery_policy
38        self.errors_collector = errors_collector
39        self._cursor: MutableMapping[str, Any] = {}
40
41    @property
42    def primary_key(self) -> PrimaryKeyType:
43        return None
44
45    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
46        return self.stream_permissions_reader.load_identity_groups(logger=self.logger)
47
48    @cache
49    def get_json_schema(self) -> JsonSchema:
50        return self.stream_permissions_reader.identities_schema

The identities stream. A full refresh stream to sync identities from a certain domain. The stream reader manage the logic to get such data, which is implemented on connector side.

27    def __init__(
28        self,
29        catalog_schema: Optional[Mapping[str, Any]],
30        stream_permissions_reader: AbstractFileBasedStreamPermissionsReader,
31        discovery_policy: AbstractDiscoveryPolicy,
32        errors_collector: FileBasedErrorsCollector,
33    ) -> None:
34        super().__init__()
35        self.catalog_schema = catalog_schema
36        self.stream_permissions_reader = stream_permissions_reader
37        self._discovery_policy = discovery_policy
38        self.errors_collector = errors_collector
39        self._cursor: MutableMapping[str, Any] = {}
is_resumable = False
Returns

True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.

catalog_schema
stream_permissions_reader
errors_collector
primary_key: Union[str, List[str], NoneType]
41    @property
42    def primary_key(self) -> PrimaryKeyType:
43        return None
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
45    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
46        return self.stream_permissions_reader.load_identity_groups(logger=self.logger)
@cache
def get_json_schema(self) -> Mapping[str, Any]:
48    @cache
49    def get_json_schema(self) -> JsonSchema:
50        return self.stream_permissions_reader.identities_schema
Returns

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.

class PermissionsFileBasedStream(airbyte_cdk.sources.file_based.stream.DefaultFileBasedStream):
20class PermissionsFileBasedStream(DefaultFileBasedStream):
21    """
22    A specialized stream for handling file-based ACL permissions.
23
24    This stream works with the stream_reader to:
25    1. Fetch ACL permissions for each file in the source
26    2. Transform permissions into a standardized format
27    3. Generate records containing permission information
28
29    The stream_reader is responsible for the actual implementation of permission retrieval
30    and schema definition, while this class handles the streaming interface.
31    """
32
33    def __init__(
34        self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any
35    ):
36        super().__init__(**kwargs)
37        self.stream_permissions_reader = stream_permissions_reader
38
39    def _filter_schema_invalid_properties(
40        self, configured_catalog_json_schema: Dict[str, Any]
41    ) -> Dict[str, Any]:
42        return self.stream_permissions_reader.file_permissions_schema
43
44    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
45        """
46        Yield permissions records from all remote files
47        """
48
49        for file in stream_slice["files"]:
50            no_permissions = False
51            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
52            try:
53                permissions_record = self.stream_permissions_reader.get_file_acl_permissions(
54                    file, logger=self.logger
55                )
56                if not permissions_record:
57                    no_permissions = True
58                    self.logger.warning(
59                        f"Unable to fetch permissions. stream={self.name} file={file.uri}"
60                    )
61                    continue
62                permissions_record = self.transform_record(
63                    permissions_record, file, file_datetime_string
64                )
65                yield stream_data_to_airbyte_message(self.name, permissions_record)
66            except Exception as e:
67                self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}")
68                yield AirbyteMessage(
69                    type=MessageType.LOG,
70                    log=AirbyteLogMessage(
71                        level=Level.ERROR,
72                        message=f"Error retrieving files permissions: stream={self.name} file={file.uri}",
73                        stack_trace=traceback.format_exc(),
74                    ),
75                )
76            finally:
77                if no_permissions:
78                    yield AirbyteMessage(
79                        type=MessageType.LOG,
80                        log=AirbyteLogMessage(
81                            level=Level.WARN,
82                            message=f"Unable to fetch permissions. stream={self.name} file={file.uri}",
83                        ),
84                    )
85
86    def _get_raw_json_schema(self) -> JsonSchema:
87        """
88        Retrieve the raw JSON schema for file permissions from the stream reader.
89
90        Returns:
91           The file permissions schema that defines the structure of permission records
92        """
93        return self.stream_permissions_reader.file_permissions_schema

A specialized stream for handling file-based ACL permissions.

This stream works with the stream_reader to:

  1. Fetch ACL permissions for each file in the source
  2. Transform permissions into a standardized format
  3. Generate records containing permission information

The stream_reader is responsible for the actual implementation of permission retrieval and schema definition, while this class handles the streaming interface.

PermissionsFileBasedStream( stream_permissions_reader: airbyte_cdk.sources.file_based.file_based_stream_permissions_reader.AbstractFileBasedStreamPermissionsReader, **kwargs: Any)
33    def __init__(
34        self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any
35    ):
36        super().__init__(**kwargs)
37        self.stream_permissions_reader = stream_permissions_reader
stream_permissions_reader
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[airbyte_cdk.AirbyteMessage]:
44    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
45        """
46        Yield permissions records from all remote files
47        """
48
49        for file in stream_slice["files"]:
50            no_permissions = False
51            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
52            try:
53                permissions_record = self.stream_permissions_reader.get_file_acl_permissions(
54                    file, logger=self.logger
55                )
56                if not permissions_record:
57                    no_permissions = True
58                    self.logger.warning(
59                        f"Unable to fetch permissions. stream={self.name} file={file.uri}"
60                    )
61                    continue
62                permissions_record = self.transform_record(
63                    permissions_record, file, file_datetime_string
64                )
65                yield stream_data_to_airbyte_message(self.name, permissions_record)
66            except Exception as e:
67                self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}")
68                yield AirbyteMessage(
69                    type=MessageType.LOG,
70                    log=AirbyteLogMessage(
71                        level=Level.ERROR,
72                        message=f"Error retrieving files permissions: stream={self.name} file={file.uri}",
73                        stack_trace=traceback.format_exc(),
74                    ),
75                )
76            finally:
77                if no_permissions:
78                    yield AirbyteMessage(
79                        type=MessageType.LOG,
80                        log=AirbyteLogMessage(
81                            level=Level.WARN,
82                            message=f"Unable to fetch permissions. stream={self.name} file={file.uri}",
83                        ),
84                    )

Yield permissions records from all remote files