  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  5from abc import abstractmethod
  6from functools import cache, cached_property, lru_cache
  7from typing import Any, Dict, Iterable, List, Mapping, Optional, Type
  9from typing_extensions import deprecated
 11from airbyte_cdk import AirbyteMessage
 12from airbyte_cdk.models import SyncMode
 13from airbyte_cdk.sources.file_based.availability_strategy import (
 14    AbstractFileBasedAvailabilityStrategy,
 16from airbyte_cdk.sources.file_based.config.file_based_stream_config import (
 17    FileBasedStreamConfig,
 18    PrimaryKeyType,
 20from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
 21from airbyte_cdk.sources.file_based.exceptions import (
 22    FileBasedErrorsCollector,
 23    FileBasedSourceError,
 24    RecordParseError,
 25    UndefinedParserError,
 27from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
 28from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
 29from airbyte_cdk.sources.file_based.remote_file import RemoteFile
 30from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy
 31from import AbstractFileBasedCursor
 32from airbyte_cdk.sources.file_based.types import StreamSlice
 33from airbyte_cdk.sources.streams import Stream
 34from airbyte_cdk.sources.streams.checkpoint import Cursor
 37class AbstractFileBasedStream(Stream):
 38    """
 39    A file-based stream in an Airbyte source.
 41    In addition to the base Stream attributes, a file-based stream has
 42    - A config object (derived from the corresponding stream section in source config).
 43      This contains the globs defining the stream's files.
 44    - A StreamReader, which knows how to list and open files in the stream.
 45    - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open
 46      files in the stream.
 47    - A DiscoveryPolicy that controls the number of concurrent requests sent to the source
 48      during discover, and the number of files used for schema discovery.
 49    - A dictionary of FileType:Parser that holds all the file types that can be handled
 50      by the stream.
 51    """
 53    def __init__(
 54        self,
 55        config: FileBasedStreamConfig,
 56        catalog_schema: Optional[Mapping[str, Any]],
 57        stream_reader: AbstractFileBasedStreamReader,
 58        availability_strategy: AbstractFileBasedAvailabilityStrategy,
 59        discovery_policy: AbstractDiscoveryPolicy,
 60        parsers: Dict[Type[Any], FileTypeParser],
 61        validation_policy: AbstractSchemaValidationPolicy,
 62        errors_collector: FileBasedErrorsCollector,
 63        cursor: AbstractFileBasedCursor,
 64    ):
 65        super().__init__()
 66        self.config = config
 67        self.catalog_schema = catalog_schema
 68        self.validation_policy = validation_policy
 69        self.stream_reader = stream_reader
 70        self._discovery_policy = discovery_policy
 71        self._availability_strategy = availability_strategy
 72        self._parsers = parsers
 73        self.errors_collector = errors_collector
 74        self._cursor = cursor
 76    @property
 77    @abstractmethod
 78    def primary_key(self) -> PrimaryKeyType: ...
 80    @cache
 81    def list_files(self) -> List[RemoteFile]:
 82        """
 83        List all files that belong to the stream.
 85        The output of this method is cached so we don't need to list the files more than once.
 86        This means we won't pick up changes to the files during a sync. This method uses the
 87        get_files method which is implemented by the concrete stream class.
 88        """
 89        return list(self.get_files())
 91    @abstractmethod
 92    def get_files(self) -> Iterable[RemoteFile]:
 93        """
 94        List all files that belong to the stream as defined by the stream's globs.
 95        """
 96        ...
 98    def read_records(
 99        self,
100        sync_mode: SyncMode,
101        cursor_field: Optional[List[str]] = None,
102        stream_slice: Optional[StreamSlice] = None,
103        stream_state: Optional[Mapping[str, Any]] = None,
104    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
105        """
106        Yield all records from all remote files in `list_files_for_this_sync`.
107        This method acts as an adapter between the generic Stream interface and the file-based's
108        stream since file-based streams manage their own states.
109        """
110        if stream_slice is None:
111            raise ValueError("stream_slice must be set")
112        return self.read_records_from_slice(stream_slice)
114    @abstractmethod
115    def read_records_from_slice(
116        self, stream_slice: StreamSlice
117    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
118        """
119        Yield all records from all remote files in `list_files_for_this_sync`.
120        """
121        ...
123    def stream_slices(
124        self,
125        *,
126        sync_mode: SyncMode,
127        cursor_field: Optional[List[str]] = None,
128        stream_state: Optional[Mapping[str, Any]] = None,
129    ) -> Iterable[Optional[Mapping[str, Any]]]:
130        """
131        This method acts as an adapter between the generic Stream interface and the file-based's
132        stream since file-based streams manage their own states.
133        """
134        return self.compute_slices()
136    @abstractmethod
137    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
138        """
139        Return a list of slices that will be used to read files in the current sync.
140        :return: The slices to use for the current sync.
141        """
142        ...
144    @abstractmethod
145    @lru_cache(maxsize=None)
146    def get_json_schema(self) -> Mapping[str, Any]:
147        """
148        Return the JSON Schema for a stream.
149        """
150        ...
152    @abstractmethod
153    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
154        """
155        Infer the schema for files in the stream.
156        """
157        ...
159    def get_parser(self) -> FileTypeParser:
160        try:
161            return self._parsers[type(self.config.format)]
162        except KeyError:
163            raise UndefinedParserError(
164                FileBasedSourceError.UNDEFINED_PARSER,
165      ,
166                format=type(self.config.format),
167            )
169    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
170        if self.validation_policy:
171            return self.validation_policy.record_passes_validation_policy(
172                record=record, schema=self.catalog_schema
173            )
174        else:
175            raise RecordParseError(
176                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
177      ,
178                validation_policy=self.config.validation_policy,
179            )
181    @cached_property
182    @deprecated("Deprecated as of CDK version 3.7.0.")
183    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
184        return self._availability_strategy
186    @property
187    def name(self) -> str:
188        return
190    def get_cursor(self) -> Optional[Cursor]:
191        """
192        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
193        the file-based cursor isn't compatible with the cursor-based iteration flow in top-level CDK. By setting this to
194        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
195        then this override can be removed.
196        """
197        return None
class AbstractFileBasedStream(airbyte_cdk.sources.streams.core.Stream):
 38class AbstractFileBasedStream(Stream):
 39    """
 40    A file-based stream in an Airbyte source.
 42    In addition to the base Stream attributes, a file-based stream has
 43    - A config object (derived from the corresponding stream section in source config).
 44      This contains the globs defining the stream's files.
 45    - A StreamReader, which knows how to list and open files in the stream.
 46    - A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open
 47      files in the stream.
 48    - A DiscoveryPolicy that controls the number of concurrent requests sent to the source
 49      during discover, and the number of files used for schema discovery.
 50    - A dictionary of FileType:Parser that holds all the file types that can be handled
 51      by the stream.
 52    """
 54    def __init__(
 55        self,
 56        config: FileBasedStreamConfig,
 57        catalog_schema: Optional[Mapping[str, Any]],
 58        stream_reader: AbstractFileBasedStreamReader,
 59        availability_strategy: AbstractFileBasedAvailabilityStrategy,
 60        discovery_policy: AbstractDiscoveryPolicy,
 61        parsers: Dict[Type[Any], FileTypeParser],
 62        validation_policy: AbstractSchemaValidationPolicy,
 63        errors_collector: FileBasedErrorsCollector,
 64        cursor: AbstractFileBasedCursor,
 65    ):
 66        super().__init__()
 67        self.config = config
 68        self.catalog_schema = catalog_schema
 69        self.validation_policy = validation_policy
 70        self.stream_reader = stream_reader
 71        self._discovery_policy = discovery_policy
 72        self._availability_strategy = availability_strategy
 73        self._parsers = parsers
 74        self.errors_collector = errors_collector
 75        self._cursor = cursor
 77    @property
 78    @abstractmethod
 79    def primary_key(self) -> PrimaryKeyType: ...
 81    @cache
 82    def list_files(self) -> List[RemoteFile]:
 83        """
 84        List all files that belong to the stream.
 86        The output of this method is cached so we don't need to list the files more than once.
 87        This means we won't pick up changes to the files during a sync. This method uses the
 88        get_files method which is implemented by the concrete stream class.
 89        """
 90        return list(self.get_files())
 92    @abstractmethod
 93    def get_files(self) -> Iterable[RemoteFile]:
 94        """
 95        List all files that belong to the stream as defined by the stream's globs.
 96        """
 97        ...
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...
160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166      ,
167                format=type(self.config.format),
168            )
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178      ,
179                validation_policy=self.config.validation_policy,
180            )
182    @cached_property
183    @deprecated("Deprecated as of CDK version 3.7.0.")
184    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
185        return self._availability_strategy
187    @property
188    def name(self) -> str:
189        return
191    def get_cursor(self) -> Optional[Cursor]:
192        """
193        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
194        the file-based cursor isn't compatible with the cursor-based iteration flow in top-level CDK. By setting this to
195        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
196        then this override can be removed.
197        """
198        return None

A file-based stream in an Airbyte source.

In addition to the base Stream attributes, a file-based stream has

  • A config object (derived from the corresponding stream section in source config). This contains the globs defining the stream's files.
  • A StreamReader, which knows how to list and open files in the stream.
  • A FileBasedAvailabilityStrategy, which knows how to verify that we can list and open files in the stream.
  • A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery.
  • A dictionary of FileType:Parser that holds all the file types that can be handled by the stream.
primary_key: Union[str, List[str], NoneType]
77    @property
78    @abstractmethod
79    def primary_key(self) -> PrimaryKeyType: ...

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

def list_files(self) -> List[airbyte_cdk.sources.file_based.RemoteFile]:
81    @cache
82    def list_files(self) -> List[RemoteFile]:
83        """
84        List all files that belong to the stream.
86        The output of this method is cached so we don't need to list the files more than once.
87        This means we won't pick up changes to the files during a sync. This method uses the
88        get_files method which is implemented by the concrete stream class.
89        """
90        return list(self.get_files())

List all files that belong to the stream.

The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class.

def get_files(self) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
92    @abstractmethod
93    def get_files(self) -> Iterable[RemoteFile]:
94        """
95        List all files that belong to the stream as defined by the stream's globs.
96        """
97        ...

List all files that belong to the stream as defined by the stream's globs.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
 99    def read_records(
100        self,
101        sync_mode: SyncMode,
102        cursor_field: Optional[List[str]] = None,
103        stream_slice: Optional[StreamSlice] = None,
104        stream_state: Optional[Mapping[str, Any]] = None,
105    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
106        """
107        Yield all records from all remote files in `list_files_for_this_sync`.
108        This method acts as an adapter between the generic Stream interface and the file-based's
109        stream since file-based streams manage their own states.
110        """
111        if stream_slice is None:
112            raise ValueError("stream_slice must be set")
113        return self.read_records_from_slice(stream_slice)

Yield all records from all remote files in list_files_for_this_sync. This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

def read_records_from_slice( self, stream_slice: Mapping[str, Any]) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
115    @abstractmethod
116    def read_records_from_slice(
117        self, stream_slice: StreamSlice
118    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
119        """
120        Yield all records from all remote files in `list_files_for_this_sync`.
121        """
122        ...

Yield all records from all remote files in list_files_for_this_sync.

def stream_slices( self, *, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
124    def stream_slices(
125        self,
126        *,
127        sync_mode: SyncMode,
128        cursor_field: Optional[List[str]] = None,
129        stream_state: Optional[Mapping[str, Any]] = None,
130    ) -> Iterable[Optional[Mapping[str, Any]]]:
131        """
132        This method acts as an adapter between the generic Stream interface and the file-based's
133        stream since file-based streams manage their own states.
134        """
135        return self.compute_slices()

This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states.

def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
137    @abstractmethod
138    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
139        """
140        Return a list of slices that will be used to read files in the current sync.
141        :return: The slices to use for the current sync.
142        """
143        ...

Return a list of slices that will be used to read files in the current sync.


The slices to use for the current sync.

def get_json_schema(self) -> Mapping[str, Any]:
145    @abstractmethod
146    @lru_cache(maxsize=None)
147    def get_json_schema(self) -> Mapping[str, Any]:
148        """
149        Return the JSON Schema for a stream.
150        """
151        ...

Return the JSON Schema for a stream.

def infer_schema( self, files: List[airbyte_cdk.sources.file_based.RemoteFile]) -> Mapping[str, Any]:
153    @abstractmethod
154    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
155        """
156        Infer the schema for files in the stream.
157        """
158        ...

Infer the schema for files in the stream.

160    def get_parser(self) -> FileTypeParser:
161        try:
162            return self._parsers[type(self.config.format)]
163        except KeyError:
164            raise UndefinedParserError(
165                FileBasedSourceError.UNDEFINED_PARSER,
166      ,
167                format=type(self.config.format),
168            )
def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
170    def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
171        if self.validation_policy:
172            return self.validation_policy.record_passes_validation_policy(
173                record=record, schema=self.catalog_schema
174            )
175        else:
176            raise RecordParseError(
177                FileBasedSourceError.UNDEFINED_VALIDATION_POLICY,
178      ,
179                validation_policy=self.config.validation_policy,
180            )
182    @cached_property
183    @deprecated("Deprecated as of CDK version 3.7.0.")
184    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
185        return self._availability_strategy
name: str
187    @property
188    def name(self) -> str:
189        return

Stream name. By default this is the implementing class name, but it can be overridden as needed.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
191    def get_cursor(self) -> Optional[Cursor]:
192        """
193        This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations
194        the file-based cursor isn't compatible with the cursor-based iteration flow in top-level CDK. By setting this to
195        None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface
196        then this override can be removed.
197        """
198        return None

This is a temporary hack. Because file-based, declarative, and concurrent have _slightly_ different cursor implementations the file-based cursor isn't compatible with the cursor-based iteration flow in top-level CDK. By setting this to None, we defer to the regular incremental checkpoint flow. Once all cursors are consolidated under a common interface then this override can be removed.