airbyte_cdk.sources.file_based.stream

 1from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream
 2from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream
 3from airbyte_cdk.sources.file_based.stream.identities_stream import FileIdentitiesStream
 4from airbyte_cdk.sources.file_based.stream.permissions_file_based_stream import (
 5    PermissionsFileBasedStream,
 6)
 7
 8__all__ = [
 9    "AbstractFileBasedStream",
10    "DefaultFileBasedStream",
11    "FileIdentitiesStream",
12    "PermissionsFileBasedStream",
13]
class AbstractFileBasedStream(airbyte_cdk.sources.streams.core.Stream):
 38class AbstractFileBasedStream(Stream):
 39    """
 40    A file-based stream in an Airbyte source.
 41
 42    In addition to the base Stream attributes, a file-based stream has
 43    - A config object (derived from the corresponding stream section in source config).
 44      This contains the globs defining the stream's files.
 45    - A StreamReader, which knows how to list and open files in the stream.
 46    - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open
 47      files in the stream.
 48    - A DiscoveryPolicy that controls the number of concurrent requests sent to the source
 49      during discover, and the number of files used for schema discovery.
 50    - A dictionary of FileType:Parser that holds all the file types that can be handled
 51      by the stream.
 52    """
 53
 54    def __init__(
 55        self,
 56        config: FileBasedStreamConfig,
 57        catalog_schema: Optional[Mapping[str, Any]],
 58        stream_reader: AbstractFileBasedStreamReader,
 59        availability_strategy: AbstractFileBasedAvailabilityStrategy,
 60        discovery_policy: AbstractDiscoveryPolicy,
 61        parsers: Dict[Type[Any], FileTypeParser],
 62        validation_policy: AbstractSchemaValidationPolicy,
 63        errors_collector: FileBasedErrorsCollector,
 64        cursor: AbstractFileBasedCursor,
 65    ):
 66        super().__init__()
 67        self.config = config
 68        self.catalog_schema = catalog_schema
 69        self.validation_policy = validation_policy
 70        self.stream_reader = stream_reader
 71        self._discovery_policy = discovery_policy
 72        self._availability_strategy = availability_strategy
 73        self._parsers = parsers
 74        self.errors_collector = errors_collector
 75        self._cursor = cursor
 76
 77    @property
 78    @abstractmethod
 79    def primary_key(self) -> PrimaryKeyType: ...
 80
 81    @cache
 82    def list_files(self) -> List[RemoteFile]:
 83        """
 84        List all files that belong to the stream.
 85
 86        The output of this method is cached so we don't need to list the files more than once.
 87        This means we won't pick up changes to the files during a sync. This method uses the
 88        get_files method which is implemented by the concrete stream class.
 89        """
 90        return list(self.get_files())
 91
 92    @abstractmethod
 93    def get_files(self) -> Iterable[RemoteFile]:
 94        """
 95        List all files that belong to the stream as defined by the stream's globs.
 96        """
 97        ...
 98
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)
114
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...
123
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()
136
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...
144
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...
152
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...
159
160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166                stream=self.name,
167                format=type(self.config.format),
168            )
169
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178                stream=self.name,
179                validation_policy=self.config.validation_policy,
180            )
181
182    @cached_property
183    @deprecated("Deprecated as of CDK version 3.7.0.")
184    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
185        return self._availability_strategy
186
187    @property
188    def name(self) -> str:
189        return self.config.name
190
191    def get_cursor(self) -> Optional[Cursor]:
192        """
193        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
194        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
195        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
196        then this override can be removed.
197        """
198        return None

A file-based stream in an Airbyte source.

In addition to the base Stream attributes, a file-based stream has

  • A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
  • A StreamReader, which knows how to list and open files in the stream.
  • A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
  • A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
  • A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
config
catalog_schema
validation_policy
stream_reader
errors_collector
primary_key: Union[str, List[str], NoneType]
77    @property
78    @abstractmethod
79    def primary_key(self) -> PrimaryKeyType: ...
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

@cache
def list_files(self) -> List[airbyte_cdk.sources.file_based.RemoteFile]:
81    @cache
82    def list_files(self) -> List[RemoteFile]:
83        """
84        List all files that belong to the stream.
85
86        The output of this method is cached so we don't need to list the files more than once.
87        This means we won't pick up changes to the files during a sync. This method uses the
88        get_files method which is implemented by the concrete stream class.
89        """
90        return list(self.get_files())

List all files that belong to the stream.

The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.

@abstractmethod
def get_files(self) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
92    @abstractmethod
93    def get_files(self) -> Iterable[RemoteFile]:
94        """
95        List all files that belong to the stream as defined by the stream's globs.
96        """
97        ...

List all files that belong to the stream as defined by the stream's globs.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)

Yield all records from all remote files in list_files_for_this_sync. This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

@abstractmethod
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...

Yield all records from all remote files in list_files_for_this_sync.

def stream_slices( self, *, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()

This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

@abstractmethod
def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...

Return a list of slices that will be used to read files in the current sync.

Returns

The slices to use for the current sync.

@abstractmethod
@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...

Return the JSON Schema for a stream.

@abstractmethod
def infer_schema( self, files: List[airbyte_cdk.sources.file_based.RemoteFile]) -> Mapping[str, Any]:
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...

Infer the schema for files in the stream.

160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166                stream=self.name,
167                format=type(self.config.format),
168            )
def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178                stream=self.name,
179                validation_policy=self.config.validation_policy,
180            )
182    @cached_property
183    @deprecated("Deprecated as of CDK version 3.7.0.")
184    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
185        return self._availability_strategy
name: str
187    @property
188    def name(self) -> str:
189        return self.config.name
Returns

Stream name. By default this is the implementing class name, but it can be overridden as needed.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
191    def get_cursor(self) -> Optional[Cursor]:
192        """
193        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
194        the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to
195        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
196        then this override can be removed.
197        """
198        return None

This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in core.py top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.

 44class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
 45    """
 46    The default file-based stream.
 47    """
 48
 49    FILE_TRANSFER_KW = "use_file_transfer"
 50    PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure"
 51    FILES_KEY = "files"
 52    DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
 53    ab_last_mod_col = "_ab_source_file_last_modified"
 54    ab_file_name_col = "_ab_source_file_url"
 55    modified = "modified"
 56    source_file_url = "source_file_url"
 57    airbyte_columns = [ab_last_mod_col, ab_file_name_col]
 58    use_file_transfer = False
 59    preserve_directory_structure = True
 60
 61    def __init__(self, **kwargs: Any):
 62        if self.FILE_TRANSFER_KW in kwargs:
 63            self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False)
 64        if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs:
 65            self.preserve_directory_structure = kwargs.pop(
 66                self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
 67            )
 68        super().__init__(**kwargs)
 69
 70    @property
 71    def state(self) -> MutableMapping[str, Any]:
 72        return self._cursor.get_state()
 73
 74    @state.setter
 75    def state(self, value: MutableMapping[str, Any]) -> None:
 76        """State setter, accept state serialized by state getter."""
 77        self._cursor.set_initial_state(value)
 78
 79    @property  # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors
 80    def cursor(self) -> Optional[AbstractFileBasedCursor]:
 81        return self._cursor
 82
 83    @cursor.setter
 84    def cursor(self, value: AbstractFileBasedCursor) -> None:
 85        if self._cursor is not None:
 86            raise RuntimeError(
 87                f"Cursor for stream {self.name} is already set. This is unexpected. Please contact Support."
 88            )
 89        self._cursor = value
 90
 91    @property
 92    def primary_key(self) -> PrimaryKeyType:
 93        return self.config.primary_key or self.get_parser().get_parser_defined_primary_key(
 94            self.config
 95        )
 96
 97    def _filter_schema_invalid_properties(
 98        self, configured_catalog_json_schema: Dict[str, Any]
 99    ) -> Dict[str, Any]:
100        if self.use_file_transfer:
101            return {
102                "type": "object",
103                "properties": {
104                    "file_path": {"type": "string"},
105                    "file_size": {"type": "string"},
106                    self.ab_file_name_col: {"type": "string"},
107                },
108            }
109        else:
110            return super()._filter_schema_invalid_properties(configured_catalog_json_schema)
111
112    def _duplicated_files_names(
113        self, slices: List[dict[str, List[RemoteFile]]]
114    ) -> List[dict[str, List[str]]]:
115        seen_file_names: Dict[str, List[str]] = defaultdict(list)
116        for file_slice in slices:
117            for file_found in file_slice[self.FILES_KEY]:
118                file_name = path.basename(file_found.uri)
119                seen_file_names[file_name].append(file_found.uri)
120        return [
121            {file_name: paths} for file_name, paths in seen_file_names.items() if len(paths) > 1
122        ]
123
124    def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
125        # Sort files by last_modified, uri and return them grouped by last_modified
126        all_files = self.list_files()
127        files_to_read = self._cursor.get_files_to_sync(all_files, self.logger)
128        sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri))
129        slices = [
130            {self.FILES_KEY: list(group[1])}
131            for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)
132        ]
133        if slices and not self.preserve_directory_structure:
134            duplicated_files_names = self._duplicated_files_names(slices)
135            if duplicated_files_names:
136                raise DuplicatedFilesError(
137                    stream=self.name, duplicated_files_names=duplicated_files_names
138                )
139        return slices
140
141    def transform_record(
142        self, record: dict[str, Any], file: RemoteFile, last_updated: str
143    ) -> dict[str, Any]:
144        # adds _ab_source_file_last_modified and _ab_source_file_url to the record
145        record[self.ab_last_mod_col] = last_updated
146        record[self.ab_file_name_col] = file.uri
147        return record
148
149    def transform_record_for_file_transfer(
150        self, record: dict[str, Any], file: RemoteFile
151    ) -> dict[str, Any]:
152        # timstamp() returns a float representing the number of seconds since the unix epoch
153        record[self.modified] = int(file.last_modified.timestamp()) * 1000
154        record[self.source_file_url] = file.uri
155        return record
156
157    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
158        """
159        Yield all records from all remote files in `list_files_for_this_sync`.
160
161        If an error is encountered reading records from a file, log a message and do not attempt
162        to sync the rest of the file.
163        """
164        schema = self.catalog_schema
165        if schema is None:
166            # On read requests we should always have the catalog available
167            raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name)
168        # The stream only supports a single file type, so we can use the same parser for all files
169        parser = self.get_parser()
170        for file in stream_slice["files"]:
171            # only serialize the datetime once
172            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
173            n_skipped = line_no = 0
174
175            try:
176                if self.use_file_transfer:
177                    self.logger.info(f"{self.name}: {file} file-based syncing")
178                    # todo: complete here the code to not rely on local parser
179                    file_transfer = FileTransfer()
180                    for record in file_transfer.get_file(
181                        self.config, file, self.stream_reader, self.logger
182                    ):
183                        line_no += 1
184                        if not self.record_passes_validation_policy(record):
185                            n_skipped += 1
186                            continue
187                        record = self.transform_record_for_file_transfer(record, file)
188                        yield stream_data_to_airbyte_message(
189                            self.name, record, is_file_transfer_message=True
190                        )
191                else:
192                    for record in parser.parse_records(
193                        self.config, file, self.stream_reader, self.logger, schema
194                    ):
195                        line_no += 1
196                        if self.config.schemaless:
197                            record = {"data": record}
198                        elif not self.record_passes_validation_policy(record):
199                            n_skipped += 1
200                            continue
201                        record = self.transform_record(record, file, file_datetime_string)
202                        yield stream_data_to_airbyte_message(self.name, record)
203                self._cursor.add_file(file)
204
205            except StopSyncPerValidationPolicy:
206                yield AirbyteMessage(
207                    type=MessageType.LOG,
208                    log=AirbyteLogMessage(
209                        level=Level.WARN,
210                        message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}",
211                    ),
212                )
213                break
214
215            except RecordParseError:
216                # Increment line_no because the exception was raised before we could increment it
217                line_no += 1
218                self.errors_collector.collect(
219                    AirbyteMessage(
220                        type=MessageType.LOG,
221                        log=AirbyteLogMessage(
222                            level=Level.ERROR,
223                            message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
224                            stack_trace=traceback.format_exc(),
225                        ),
226                    ),
227                )
228
229            except AirbyteTracedException as exc:
230                # Re-raise the exception to stop the whole sync immediately as this is a fatal error
231                raise exc
232
233            except Exception:
234                yield AirbyteMessage(
235                    type=MessageType.LOG,
236                    log=AirbyteLogMessage(
237                        level=Level.ERROR,
238                        message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
239                        stack_trace=traceback.format_exc(),
240                    ),
241                )
242
243            finally:
244                if n_skipped:
245                    yield AirbyteMessage(
246                        type=MessageType.LOG,
247                        log=AirbyteLogMessage(
248                            level=Level.WARN,
249                            message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}",
250                        ),
251                    )
252
253    @property
254    def cursor_field(self) -> Union[str, List[str]]:
255        """
256        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
257        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
258        """
259        return self.ab_last_mod_col
260
261    @cache
262    def get_json_schema(self) -> JsonSchema:
263        extra_fields = {
264            self.ab_last_mod_col: {"type": "string"},
265            self.ab_file_name_col: {"type": "string"},
266        }
267        try:
268            schema = self._get_raw_json_schema()
269        except InvalidSchemaError as config_exception:
270            raise AirbyteTracedException(
271                internal_message="Please check the logged errors for more information.",
272                message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
273                exception=AirbyteTracedException(exception=config_exception),
274                failure_type=FailureType.config_error,
275            )
276        except AirbyteTracedException as ate:
277            raise ate
278        except Exception as exc:
279            raise SchemaInferenceError(
280                FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
281            ) from exc
282        else:
283            return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
284
285    def _get_raw_json_schema(self) -> JsonSchema:
286        if self.use_file_transfer:
287            return file_transfer_schema
288        elif self.config.input_schema:
289            return self.config.get_input_schema()  # type: ignore
290        elif self.config.schemaless:
291            return schemaless_schema
292        else:
293            files = self.list_files()
294            first_n_files = len(files)
295
296            if self.config.recent_n_files_to_read_for_schema_discovery:
297                self.logger.info(
298                    msg=(
299                        f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema "
300                        f"for stream {self.name} due to limitation in config."
301                    )
302                )
303                first_n_files = self.config.recent_n_files_to_read_for_schema_discovery
304
305        if first_n_files == 0:
306            self.logger.warning(
307                msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream."
308            )
309            return schemaless_schema
310
311        max_n_files_for_schema_inference = (
312            self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())
313        )
314
315        if first_n_files > max_n_files_for_schema_inference:
316            # Use the most recent files for schema inference, so we pick up schema changes during discovery.
317            self.logger.warning(
318                msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files."
319            )
320            first_n_files = max_n_files_for_schema_inference
321
322        files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files]
323
324        inferred_schema = self.infer_schema(files)
325
326        if not inferred_schema:
327            raise InvalidSchemaError(
328                FileBasedSourceError.INVALID_SCHEMA_ERROR,
329                details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
330                stream=self.name,
331            )
332
333        schema = {"type": "object", "properties": inferred_schema}
334
335        return schema
336
337    def get_files(self) -> Iterable[RemoteFile]:
338        """
339        Return all files that belong to the stream as defined by the stream's globs.
340        """
341        return self.stream_reader.get_matching_files(
342            self.config.globs or [], self.config.legacy_prefix, self.logger
343        )
344
345    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
346        loop = asyncio.get_event_loop()
347        schema = loop.run_until_complete(self._infer_schema(files))
348        # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
349        return self._fill_nulls(deepcopy(schema))
350
351    @staticmethod
352    def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]:
353        if isinstance(schema, dict):
354            for k, v in schema.items():
355                if k == "type":
356                    if isinstance(v, list):
357                        if "null" not in v:
358                            schema[k] = ["null"] + v
359                    elif v != "null":
360                        if isinstance(v, (str, list)):
361                            schema[k] = ["null", v]
362                        else:
363                            DefaultFileBasedStream._fill_nulls(v)
364                else:
365                    DefaultFileBasedStream._fill_nulls(v)
366        elif isinstance(schema, list):
367            for item in schema:
368                DefaultFileBasedStream._fill_nulls(item)
369        return schema
370
371    async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
372        """
373        Infer the schema for a stream.
374
375        Each file type has a corresponding `infer_schema` handler.
376        Dispatch on file type.
377        """
378        base_schema: SchemaType = {}
379        pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set()
380
381        n_started, n_files = 0, len(files)
382        files_iterator = iter(files)
383        while pending_tasks or n_started < n_files:
384            while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and (
385                file := next(files_iterator, None)
386            ):
387                pending_tasks.add(asyncio.create_task(self._infer_file_schema(file)))
388                n_started += 1
389            # Return when the first task is completed so that we can enqueue a new task as soon as the
390            # number of concurrent tasks drops below the number allowed.
391            done, pending_tasks = await asyncio.wait(
392                pending_tasks, return_when=asyncio.FIRST_COMPLETED
393            )
394            for task in done:
395                try:
396                    base_schema = merge_schemas(base_schema, task.result())
397                except AirbyteTracedException as ate:
398                    raise ate
399                except Exception as exc:
400                    self.logger.error(
401                        f"An error occurred inferring the schema. \n {traceback.format_exc()}",
402                        exc_info=exc,
403                    )
404
405        return base_schema
406
407    async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
408        try:
409            return await self.get_parser().infer_schema(
410                self.config, file, self.stream_reader, self.logger
411            )
412        except AirbyteTracedException as ate:
413            raise ate
414        except Exception as exc:
415            raise SchemaInferenceError(
416                FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
417                file=file.uri,
418                format=str(self.config.format),
419                stream=self.name,
420            ) from exc

The default file-based stream.

DefaultFileBasedStream(**kwargs: Any)
61    def __init__(self, **kwargs: Any):
62        if self.FILE_TRANSFER_KW in kwargs:
63            self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False)
64        if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs:
65            self.preserve_directory_structure = kwargs.pop(
66                self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
67            )
68        super().__init__(**kwargs)
FILE_TRANSFER_KW = 'use_file_transfer'
PRESERVE_DIRECTORY_STRUCTURE_KW = 'preserve_directory_structure'
FILES_KEY = 'files'
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
ab_last_mod_col = '_ab_source_file_last_modified'
ab_file_name_col = '_ab_source_file_url'
modified = 'modified'
source_file_url = 'source_file_url'
airbyte_columns = ['_ab_source_file_last_modified', '_ab_source_file_url']
use_file_transfer = False
preserve_directory_structure = True
state: MutableMapping[str, Any]
70    @property
71    def state(self) -> MutableMapping[str, Any]:
72        return self._cursor.get_state()

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

79    @property  # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors
80    def cursor(self) -> Optional[AbstractFileBasedCursor]:
81        return self._cursor
primary_key: Union[str, List[str], NoneType]
91    @property
92    def primary_key(self) -> PrimaryKeyType:
93        return self.config.primary_key or self.get_parser().get_parser_defined_primary_key(
94            self.config
95        )
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
124    def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
125        # Sort files by last_modified, uri and return them grouped by last_modified
126        all_files = self.list_files()
127        files_to_read = self._cursor.get_files_to_sync(all_files, self.logger)
128        sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri))
129        slices = [
130            {self.FILES_KEY: list(group[1])}
131            for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)
132        ]
133        if slices and not self.preserve_directory_structure:
134            duplicated_files_names = self._duplicated_files_names(slices)
135            if duplicated_files_names:
136                raise DuplicatedFilesError(
137                    stream=self.name, duplicated_files_names=duplicated_files_names
138                )
139        return slices

Return a list of slices that will be used to read files in the current sync.

Returns

The slices to use for the current sync.

def transform_record( self, record: dict[str, typing.Any], file: airbyte_cdk.sources.file_based.RemoteFile, last_updated: str) -> dict[str, typing.Any]:
141    def transform_record(
142        self, record: dict[str, Any], file: RemoteFile, last_updated: str
143    ) -> dict[str, Any]:
144        # adds _ab_source_file_last_modified and _ab_source_file_url to the record
145        record[self.ab_last_mod_col] = last_updated
146        record[self.ab_file_name_col] = file.uri
147        return record
def transform_record_for_file_transfer( self, record: dict[str, typing.Any], file: airbyte_cdk.sources.file_based.RemoteFile) -> dict[str, typing.Any]:
149    def transform_record_for_file_transfer(
150        self, record: dict[str, Any], file: RemoteFile
151    ) -> dict[str, Any]:
152        # timstamp() returns a float representing the number of seconds since the unix epoch
153        record[self.modified] = int(file.last_modified.timestamp()) * 1000
154        record[self.source_file_url] = file.uri
155        return record
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[airbyte_cdk.AirbyteMessage]:
157    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
158        """
159        Yield all records from all remote files in `list_files_for_this_sync`.
160
161        If an error is encountered reading records from a file, log a message and do not attempt
162        to sync the rest of the file.
163        """
164        schema = self.catalog_schema
165        if schema is None:
166            # On read requests we should always have the catalog available
167            raise MissingSchemaError(FileBasedSourceError.MISSING_SCHEMA, stream=self.name)
168        # The stream only supports a single file type, so we can use the same parser for all files
169        parser = self.get_parser()
170        for file in stream_slice["files"]:
171            # only serialize the datetime once
172            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
173            n_skipped = line_no = 0
174
175            try:
176                if self.use_file_transfer:
177                    self.logger.info(f"{self.name}: {file} file-based syncing")
178                    # todo: complete here the code to not rely on local parser
179                    file_transfer = FileTransfer()
180                    for record in file_transfer.get_file(
181                        self.config, file, self.stream_reader, self.logger
182                    ):
183                        line_no += 1
184                        if not self.record_passes_validation_policy(record):
185                            n_skipped += 1
186                            continue
187                        record = self.transform_record_for_file_transfer(record, file)
188                        yield stream_data_to_airbyte_message(
189                            self.name, record, is_file_transfer_message=True
190                        )
191                else:
192                    for record in parser.parse_records(
193                        self.config, file, self.stream_reader, self.logger, schema
194                    ):
195                        line_no += 1
196                        if self.config.schemaless:
197                            record = {"data": record}
198                        elif not self.record_passes_validation_policy(record):
199                            n_skipped += 1
200                            continue
201                        record = self.transform_record(record, file, file_datetime_string)
202                        yield stream_data_to_airbyte_message(self.name, record)
203                self._cursor.add_file(file)
204
205            except StopSyncPerValidationPolicy:
206                yield AirbyteMessage(
207                    type=MessageType.LOG,
208                    log=AirbyteLogMessage(
209                        level=Level.WARN,
210                        message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.value} n_skipped={n_skipped}",
211                    ),
212                )
213                break
214
215            except RecordParseError:
216                # Increment line_no because the exception was raised before we could increment it
217                line_no += 1
218                self.errors_collector.collect(
219                    AirbyteMessage(
220                        type=MessageType.LOG,
221                        log=AirbyteLogMessage(
222                            level=Level.ERROR,
223                            message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
224                            stack_trace=traceback.format_exc(),
225                        ),
226                    ),
227                )
228
229            except AirbyteTracedException as exc:
230                # Re-raise the exception to stop the whole sync immediately as this is a fatal error
231                raise exc
232
233            except Exception:
234                yield AirbyteMessage(
235                    type=MessageType.LOG,
236                    log=AirbyteLogMessage(
237                        level=Level.ERROR,
238                        message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
239                        stack_trace=traceback.format_exc(),
240                    ),
241                )
242
243            finally:
244                if n_skipped:
245                    yield AirbyteMessage(
246                        type=MessageType.LOG,
247                        log=AirbyteLogMessage(
248                            level=Level.WARN,
249                            message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}",
250                        ),
251                    )

Yield all records from all remote files in list_files_for_this_sync.

If an error is encountered reading records from a file, log a message and do not attempt to sync the rest of the file.

cursor_field: Union[str, List[str]]
253    @property
254    def cursor_field(self) -> Union[str, List[str]]:
255        """
256        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
257        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
258        """
259        return self.ab_last_mod_col

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

@cache
def get_json_schema(self) -> Mapping[str, Any]:
261    @cache
262    def get_json_schema(self) -> JsonSchema:
263        extra_fields = {
264            self.ab_last_mod_col: {"type": "string"},
265            self.ab_file_name_col: {"type": "string"},
266        }
267        try:
268            schema = self._get_raw_json_schema()
269        except InvalidSchemaError as config_exception:
270            raise AirbyteTracedException(
271                internal_message="Please check the logged errors for more information.",
272                message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
273                exception=AirbyteTracedException(exception=config_exception),
274                failure_type=FailureType.config_error,
275            )
276        except AirbyteTracedException as ate:
277            raise ate
278        except Exception as exc:
279            raise SchemaInferenceError(
280                FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
281            ) from exc
282        else:
283            return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}

Return the JSON Schema for a stream.

def get_files(self) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
337    def get_files(self) -> Iterable[RemoteFile]:
338        """
339        Return all files that belong to the stream as defined by the stream's globs.
340        """
341        return self.stream_reader.get_matching_files(
342            self.config.globs or [], self.config.legacy_prefix, self.logger
343        )

Return all files that belong to the stream as defined by the stream's globs.

def infer_schema( self, files: List[airbyte_cdk.sources.file_based.RemoteFile]) -> Mapping[str, Any]:
345    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
346        loop = asyncio.get_event_loop()
347        schema = loop.run_until_complete(self._infer_schema(files))
348        # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
349        return self._fill_nulls(deepcopy(schema))

Infer the schema for files in the stream.

19class FileIdentitiesStream(IdentitiesStream):
20    """
21    The identities stream. A full refresh stream to sync identities from a certain domain.
22    The stream reader manage the logic to get such data, which is implemented on connector side.
23    """
24
25    is_resumable = False
26
27    def __init__(
28        self,
29        catalog_schema: Optional[Mapping[str, Any]],
30        stream_permissions_reader: AbstractFileBasedStreamPermissionsReader,
31        discovery_policy: AbstractDiscoveryPolicy,
32        errors_collector: FileBasedErrorsCollector,
33    ) -> None:
34        super().__init__()
35        self.catalog_schema = catalog_schema
36        self.stream_permissions_reader = stream_permissions_reader
37        self._discovery_policy = discovery_policy
38        self.errors_collector = errors_collector
39        self._cursor: MutableMapping[str, Any] = {}
40
41    @property
42    def primary_key(self) -> PrimaryKeyType:
43        return None
44
45    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
46        return self.stream_permissions_reader.load_identity_groups(logger=self.logger)
47
48    @cache
49    def get_json_schema(self) -> JsonSchema:
50        return self.stream_permissions_reader.identities_schema

The identities stream. A full refresh stream to sync identities from a certain domain. The stream reader manage the logic to get such data, which is implemented on connector side.

27    def __init__(
28        self,
29        catalog_schema: Optional[Mapping[str, Any]],
30        stream_permissions_reader: AbstractFileBasedStreamPermissionsReader,
31        discovery_policy: AbstractDiscoveryPolicy,
32        errors_collector: FileBasedErrorsCollector,
33    ) -> None:
34        super().__init__()
35        self.catalog_schema = catalog_schema
36        self.stream_permissions_reader = stream_permissions_reader
37        self._discovery_policy = discovery_policy
38        self.errors_collector = errors_collector
39        self._cursor: MutableMapping[str, Any] = {}
is_resumable = False
Returns

True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.

catalog_schema
stream_permissions_reader
errors_collector
primary_key: Union[str, List[str], NoneType]
41    @property
42    def primary_key(self) -> PrimaryKeyType:
43        return None
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
45    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
46        return self.stream_permissions_reader.load_identity_groups(logger=self.logger)
@cache
def get_json_schema(self) -> Mapping[str, Any]:
48    @cache
49    def get_json_schema(self) -> JsonSchema:
50        return self.stream_permissions_reader.identities_schema
Returns

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.

class PermissionsFileBasedStream(airbyte_cdk.sources.file_based.stream.DefaultFileBasedStream):
20class PermissionsFileBasedStream(DefaultFileBasedStream):
21    """
22    A specialized stream for handling file-based ACL permissions.
23
24    This stream works with the stream_reader to:
25    1. Fetch ACL permissions for each file in the source
26    2. Transform permissions into a standardized format
27    3. Generate records containing permission information
28
29    The stream_reader is responsible for the actual implementation of permission retrieval
30    and schema definition, while this class handles the streaming interface.
31    """
32
33    def __init__(
34        self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any
35    ):
36        super().__init__(**kwargs)
37        self.stream_permissions_reader = stream_permissions_reader
38
39    def _filter_schema_invalid_properties(
40        self, configured_catalog_json_schema: Dict[str, Any]
41    ) -> Dict[str, Any]:
42        return self.stream_permissions_reader.file_permissions_schema
43
44    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
45        """
46        Yield permissions records from all remote files
47        """
48
49        for file in stream_slice["files"]:
50            no_permissions = False
51            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
52            try:
53                permissions_record = self.stream_permissions_reader.get_file_acl_permissions(
54                    file, logger=self.logger
55                )
56                if not permissions_record:
57                    no_permissions = True
58                    self.logger.warning(
59                        f"Unable to fetch permissions. stream={self.name} file={file.uri}"
60                    )
61                    continue
62                permissions_record = self.transform_record(
63                    permissions_record, file, file_datetime_string
64                )
65                yield stream_data_to_airbyte_message(
66                    self.name, permissions_record, is_file_transfer_message=False
67                )
68            except Exception as e:
69                self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}")
70                yield AirbyteMessage(
71                    type=MessageType.LOG,
72                    log=AirbyteLogMessage(
73                        level=Level.ERROR,
74                        message=f"Error retrieving files permissions: stream={self.name} file={file.uri}",
75                        stack_trace=traceback.format_exc(),
76                    ),
77                )
78            finally:
79                if no_permissions:
80                    yield AirbyteMessage(
81                        type=MessageType.LOG,
82                        log=AirbyteLogMessage(
83                            level=Level.WARN,
84                            message=f"Unable to fetch permissions. stream={self.name} file={file.uri}",
85                        ),
86                    )
87
88    def _get_raw_json_schema(self) -> JsonSchema:
89        """
90        Retrieve the raw JSON schema for file permissions from the stream reader.
91
92        Returns:
93           The file permissions schema that defines the structure of permission records
94        """
95        return self.stream_permissions_reader.file_permissions_schema

A specialized stream for handling file-based ACL permissions.

This stream works with the stream_reader to:

  1. Fetch ACL permissions for each file in the source
  2. Transform permissions into a standardized format
  3. Generate records containing permission information

The stream_reader is responsible for the actual implementation of permission retrieval and schema definition, while this class handles the streaming interface.

PermissionsFileBasedStream( stream_permissions_reader: airbyte_cdk.sources.file_based.file_based_stream_permissions_reader.AbstractFileBasedStreamPermissionsReader, **kwargs: Any)
33    def __init__(
34        self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any
35    ):
36        super().__init__(**kwargs)
37        self.stream_permissions_reader = stream_permissions_reader
stream_permissions_reader
def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[airbyte_cdk.AirbyteMessage]:
44    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
45        """
46        Yield permissions records from all remote files
47        """
48
49        for file in stream_slice["files"]:
50            no_permissions = False
51            file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
52            try:
53                permissions_record = self.stream_permissions_reader.get_file_acl_permissions(
54                    file, logger=self.logger
55                )
56                if not permissions_record:
57                    no_permissions = True
58                    self.logger.warning(
59                        f"Unable to fetch permissions. stream={self.name} file={file.uri}"
60                    )
61                    continue
62                permissions_record = self.transform_record(
63                    permissions_record, file, file_datetime_string
64                )
65                yield stream_data_to_airbyte_message(
66                    self.name, permissions_record, is_file_transfer_message=False
67                )
68            except Exception as e:
69                self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}")
70                yield AirbyteMessage(
71                    type=MessageType.LOG,
72                    log=AirbyteLogMessage(
73                        level=Level.ERROR,
74                        message=f"Error retrieving files permissions: stream={self.name} file={file.uri}",
75                        stack_trace=traceback.format_exc(),
76                    ),
77                )
78            finally:
79                if no_permissions:
80                    yield AirbyteMessage(
81                        type=MessageType.LOG,
82                        log=AirbyteLogMessage(
83                            level=Level.WARN,
84                            message=f"Unable to fetch permissions. stream={self.name} file={file.uri}",
85                        ),
86                    )

Yield permissions records from all remote files