airbyte
PyAirbyte brings Airbyte ELT to every Python developer.
PyAirbyte
PyAirbyte brings the power of Airbyte to every Python developer. PyAirbyte provides a set of utilities to use Airbyte connectors in Python.
Getting Started
Watch this Getting Started Loom video or run one of our Quickstart tutorials below to see how you can use PyAirbyte in your python code.
Secrets Management
PyAirbyte can auto-import secrets from the following sources:
- Environment variables.
- Variables defined in a local
.env
("Dotenv") file. - Google Colab secrets.
- Manual entry via
getpass
.
_Note: You can also build your own secret manager by subclassing the CustomSecretManager
implementation. For more information, see the airbyte.secrets.CustomSecretManager
class definiton._
Retrieving Secrets
import airbyte as ab
source = ab.get_source("source-github")
source.set_config(
"credentials": {
"personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
}
)
By default, PyAirbyte will search all available secrets sources. The get_secret()
function also accepts an optional sources
argument of specific source names (SecretSourceEnum
) and/or secret manager objects to check.
By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other secret managers. You can disable this prompt by passing allow_prompt=False
to get_secret()
.
For more information, see the airbyte.secrets
module.
Secrets Auto-Discovery
If you have a secret matching an expected name, PyAirbyte will automatically use it. For example, if you have a secret named GITHUB_PERSONAL_ACCESS_TOKEN
, PyAirbyte will automatically use it when configuring the GitHub source.
The naming convention for secrets is as {CONNECTOR_NAME}_{PROPERTY_NAME}
, for instance SNOWFLAKE_PASSWORD
and BIGQUERY_CREDENTIALS_PATH
.
PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: AIRBYTE_CLOUD_API_URL
, AIRBYTE_CLOUD_API_KEY
, etc.
Contributing
To learn how you can contribute to PyAirbyte, please see our PyAirbyte Contributors Guide.
Frequently asked Questions
1. Does PyAirbyte replace Airbyte? No.
2. What is the PyAirbyte cache? Is it a destination? Yes, you can think of it as a built-in destination implementation, but we avoid the word "destination" in our docs to prevent confusion with our certified destinations list here.
3. Does PyAirbyte work with data orchestration frameworks like Airflow, Dagster, and Snowpark, Yes, it should. Please give it a try and report any problems you see. Also, drop us a note if works for you!
4. Can I use PyAirbyte to develop or test when developing Airbyte sources? Yes, you can, but only for Python-based sources.
5. Can I develop traditional ETL pipelines with PyAirbyte? Yes. Just pick the cache type matching the destination - like SnowflakeCache for landing data in Snowflake.
6. Can PyAirbyte import a connector from a local directory that has python project files, or does it have to be pip install Yes, PyAirbyte can use any local install that has a CLI - and will automatically find connectors by name if they are on PATH.
Changelog and Release Notes
For a version history and list of all changes, please see our GitHub Releases page.
API Reference
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte brings Airbyte ELT to every Python developer. 3 4.. include:: ../README.md 5 6## API Reference 7 8""" 9 10from __future__ import annotations 11 12from airbyte import ( 13 caches, 14 cloud, 15 datasets, 16 documents, 17 exceptions, # noqa: ICN001 # No 'exc' alias for top-level module 18 experimental, 19 results, 20 secrets, 21 sources, 22) 23from airbyte.caches.bigquery import BigQueryCache 24from airbyte.caches.duckdb import DuckDBCache 25from airbyte.caches.util import get_default_cache, new_local_cache 26from airbyte.datasets import CachedDataset 27from airbyte.records import StreamRecord 28from airbyte.results import ReadResult 29from airbyte.secrets import SecretSourceEnum, get_secret 30from airbyte.sources import registry 31from airbyte.sources.base import Source 32from airbyte.sources.registry import get_available_connectors 33from airbyte.sources.util import get_source 34 35 36__all__ = [ 37 # Modules 38 "cloud", 39 "caches", 40 "datasets", 41 "documents", 42 "exceptions", 43 "experimental", 44 "records", 45 "registry", 46 "results", 47 "secrets", 48 "sources", 49 # Factories 50 "get_available_connectors", 51 "get_default_cache", 52 "get_secret", 53 "get_source", 54 "new_local_cache", 55 # Classes 56 "BigQueryCache", 57 "CachedDataset", 58 "DuckDBCache", 59 "ReadResult", 60 "SecretSourceEnum", 61 "Source", 62 "StreamRecord", 63] 64 65__docformat__ = "google"
118def get_available_connectors() -> list[str]: 119 """Return a list of all available connectors. 120 121 Connectors will be returned in alphabetical order, with the standard prefix "source-". 122 """ 123 return sorted( 124 conn.name for conn in _get_registry_cache().values() if conn.pypi_package_name is not None 125 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".
15def get_default_cache() -> DuckDBCache: 16 """Get a local cache for storing data, using the default database path. 17 18 Cache files are stored in the `.cache` directory, relative to the current 19 working directory. 20 """ 21 cache_dir = Path("./.cache/default_cache") 22 return DuckDBCache( 23 db_path=cache_dir / "default_cache.duckdb", 24 cache_dir=cache_dir, 25 )
Get a local cache for storing data, using the default database path.
Cache files are stored in the .cache
directory, relative to the current
working directory.
15def get_secret( 16 secret_name: str, 17 /, 18 *, 19 sources: list[SecretManager | SecretSourceEnum] | None = None, 20 allow_prompt: bool = True, 21 **kwargs: dict[str, Any], 22) -> SecretString: 23 """Get a secret from the environment. 24 25 The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum` 26 options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum` 27 entries is passed, then the sources will be checked using the provided ordering. 28 29 If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then 30 the user will be prompted to enter the secret if it is not found in any of the other sources. 31 """ 32 if "source" in kwargs: 33 warnings.warn( 34 message="The `source` argument is deprecated. Use the `sources` argument instead.", 35 category=DeprecationWarning, 36 stacklevel=2, 37 ) 38 sources = kwargs.pop("source") # type: ignore [assignment] 39 40 available_sources: dict[str, SecretManager] = {} 41 for available_source in _get_secret_sources(): 42 # Add available sources to the dict. Order matters. 43 available_sources[available_source.name] = available_source 44 45 if sources is None: 46 # If ANY is in the list, then we don't need to check any other sources. 47 # This is the default behavior. 48 sources = list(available_sources.values()) 49 50 elif not isinstance(sources, list): 51 sources = [sources] # type: ignore [unreachable] # This is a 'just in case' catch. 52 53 # Replace any SecretSourceEnum strings with the matching SecretManager object 54 for source in list(sources): 55 if isinstance(source, SecretSourceEnum): 56 if source not in available_sources: 57 raise exc.PyAirbyteInputError( 58 guidance="Invalid secret source name.", 59 input_value=source, 60 context={ 61 "Available Sources": list(available_sources.keys()), 62 }, 63 ) 64 65 sources[sources.index(source)] = available_sources[source] 66 67 secret_managers = cast(list[SecretManager], sources) 68 69 if SecretSourceEnum.PROMPT in secret_managers: 70 prompt_source = secret_managers.pop( 71 # Mis-typed, but okay here since we have equality logic for the enum comparison: 72 secret_managers.index(SecretSourceEnum.PROMPT), # type: ignore [arg-type] 73 ) 74 75 if allow_prompt: 76 # Always check prompt last. Add it to the end of the list. 77 secret_managers.append(prompt_source) 78 79 for secret_mgr in secret_managers: 80 val = secret_mgr.get_secret(secret_name) 81 if val: 82 return SecretString(val) 83 84 raise exc.PyAirbyteSecretNotFoundError( 85 secret_name=secret_name, 86 sources=[str(s) for s in available_sources], 87 )
Get a secret from the environment.
The optional sources
argument of enum type SecretSourceEnum
or list of SecretSourceEnum
options. If left blank, all available sources will be checked. If a list of SecretSourceEnum
entries is passed, then the sources will be checked using the provided ordering.
If allow_prompt
is True
or if SecretSourceEnum.PROMPT is declared in the source
arg, then
the user will be prompted to enter the secret if it is not found in any of the other sources.
208def get_source( 209 name: str, 210 config: dict[str, Any] | None = None, 211 *, 212 streams: str | list[str] | None = None, 213 version: str | None = None, 214 pip_url: str | None = None, 215 local_executable: Path | str | None = None, 216 install_if_missing: bool = True, 217) -> Source: 218 """Get a connector by name and version. 219 220 Args: 221 name: connector name 222 config: connector config - if not provided, you need to set it later via the set_config 223 method. 224 streams: list of stream names to select for reading. If set to "*", all streams will be 225 selected. If not provided, you can set it later via the `select_streams()` or 226 `select_all_streams()` method. 227 version: connector version - if not provided, the currently installed version will be used. 228 If no version is installed, the latest available version will be used. The version can 229 also be set to "latest" to force the use of the latest available version. 230 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 231 connector name. 232 local_executable: If set, the connector will be assumed to already be installed and will be 233 executed using this path or executable name. Otherwise, the connector will be installed 234 automatically in a virtual environment. 235 install_if_missing: Whether to install the connector if it is not available locally. This 236 parameter is ignored when local_executable is set. 237 """ 238 return _get_source( 239 name=name, 240 config=config, 241 streams=streams, 242 version=version, 243 pip_url=pip_url, 244 local_executable=local_executable, 245 install_if_missing=install_if_missing, 246 )
Get a connector by name and version.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
28def new_local_cache( 29 cache_name: str | None = None, 30 cache_dir: str | Path | None = None, 31 *, 32 cleanup: bool = True, 33) -> DuckDBCache: 34 """Get a local cache for storing data, using a name string to seed the path. 35 36 Args: 37 cache_name: Name to use for the cache. Defaults to None. 38 cache_dir: Root directory to store the cache in. Defaults to None. 39 cleanup: Whether to clean up temporary files. Defaults to True. 40 41 Cache files are stored in the `.cache` directory, relative to the current 42 working directory. 43 """ 44 if cache_name: 45 if " " in cache_name: 46 raise exc.PyAirbyteInputError( 47 message="Cache name cannot contain spaces.", 48 input_value=cache_name, 49 ) 50 51 if not cache_name.replace("_", "").isalnum(): 52 raise exc.PyAirbyteInputError( 53 message="Cache name can only contain alphanumeric characters and underscores.", 54 input_value=cache_name, 55 ) 56 57 cache_name = cache_name or str(ulid.ULID()) 58 cache_dir = cache_dir or Path(f"./.cache/{cache_name}") 59 if not isinstance(cache_dir, Path): 60 cache_dir = Path(cache_dir) 61 62 return DuckDBCache( 63 db_path=cache_dir / f"db_{cache_name}.duckdb", 64 cache_dir=cache_dir, 65 cleanup=cleanup, 66 )
Get a local cache for storing data, using a name string to seed the path.
Arguments:
- cache_name: Name to use for the cache. Defaults to None.
- cache_dir: Root directory to store the cache in. Defaults to None.
- cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the .cache
directory, relative to the current
working directory.
29class BigQueryCache(BigQueryConfig, CacheBase): 30 """The BigQuery cache implementation.""" 31 32 _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor)
The BigQuery cache implementation.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.bigquery.BigQueryConfig
- database_name
- schema_name
- credentials_path
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte._future_cdk.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- pydantic.main.BaseModel
- Config
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
132class CachedDataset(SQLDataset): 133 """A dataset backed by a SQL table cache. 134 135 Because this dataset includes all records from the underlying table, we also expose the 136 underlying table as a SQLAlchemy Table object. 137 """ 138 139 def __init__( 140 self, 141 cache: CacheBase, 142 stream_name: str, 143 ) -> None: 144 """We construct the query statement by selecting all columns from the table. 145 146 This prevents the need to scan the table schema to construct the query statement. 147 """ 148 table_name = cache.processor.get_sql_table_name(stream_name) 149 schema_name = cache.schema_name 150 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 151 super().__init__( 152 cache=cache, 153 stream_name=stream_name, 154 query_statement=query, 155 ) 156 157 @overrides 158 def to_pandas(self) -> DataFrame: 159 """Return the underlying dataset data as a pandas DataFrame.""" 160 return self._cache.get_pandas_dataframe(self._stream_name) 161 162 def to_sql_table(self) -> Table: 163 """Return the underlying SQL table as a SQLAlchemy Table object.""" 164 return self._cache.processor.get_sql_table(self.stream_name) 165 166 def __eq__(self, value: object) -> bool: 167 """Return True if the value is a CachedDataset with the same cache and stream name. 168 169 In the case of CachedDataset objects, we can simply compare the cache and stream name. 170 171 Note that this equality check is only supported on CachedDataset objects and not for 172 the base SQLDataset implementation. This is because of the complexity and computational 173 cost of comparing two arbitrary SQL queries that could be bound to different variables, 174 as well as the chance that two queries can be syntactically equivalent without being 175 text-wise equivalent. 176 """ 177 if not isinstance(value, SQLDataset): 178 return False 179 180 if self._cache is not value._cache: 181 return False 182 183 return not self._stream_name != value._stream_name 184 185 def __hash__(self) -> int: 186 return hash(self._stream_name)
A dataset backed by a SQL table cache.
Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.
139 def __init__( 140 self, 141 cache: CacheBase, 142 stream_name: str, 143 ) -> None: 144 """We construct the query statement by selecting all columns from the table. 145 146 This prevents the need to scan the table schema to construct the query statement. 147 """ 148 table_name = cache.processor.get_sql_table_name(stream_name) 149 schema_name = cache.schema_name 150 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 151 super().__init__( 152 cache=cache, 153 stream_name=stream_name, 154 query_statement=query, 155 )
We construct the query statement by selecting all columns from the table.
This prevents the need to scan the table schema to construct the query statement.
157 @overrides 158 def to_pandas(self) -> DataFrame: 159 """Return the underlying dataset data as a pandas DataFrame.""" 160 return self._cache.get_pandas_dataframe(self._stream_name)
Return the underlying dataset data as a pandas DataFrame.
162 def to_sql_table(self) -> Table: 163 """Return the underlying SQL table as a SQLAlchemy Table object.""" 164 return self._cache.processor.get_sql_table(self.stream_name)
Return the underlying SQL table as a SQLAlchemy Table object.
Inherited Members
- airbyte.datasets._sql.SQLDataset
- stream_name
- with_filter
- airbyte.datasets._base.DatasetBase
- to_documents
37class DuckDBCache(DuckDBConfig, CacheBase): 38 """A DuckDB cache.""" 39 40 _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)
A DuckDB cache.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.duckdb.DuckDBConfig
- db_path
- schema_name
- get_sql_alchemy_url
- get_database_name
- get_sql_engine
- airbyte._future_cdk.sql_processor.SqlConfig
- table_prefix
- get_vendor_client
- pydantic.main.BaseModel
- Config
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
19class ReadResult(Mapping[str, CachedDataset]): 20 def __init__( 21 self, 22 processed_records: int, 23 cache: CacheBase, 24 processed_streams: list[str], 25 ) -> None: 26 self.processed_records = processed_records 27 self._cache = cache 28 self._processed_streams = processed_streams 29 30 def __getitem__(self, stream: str) -> CachedDataset: 31 if stream not in self._processed_streams: 32 raise KeyError(stream) 33 34 return CachedDataset(self._cache, stream) 35 36 def __contains__(self, stream: object) -> bool: 37 if not isinstance(stream, str): 38 return False 39 40 return stream in self._processed_streams 41 42 def __iter__(self) -> Iterator[str]: 43 return self._processed_streams.__iter__() 44 45 def __len__(self) -> int: 46 return len(self._processed_streams) 47 48 def get_sql_engine(self) -> Engine: 49 return self._cache.get_sql_engine() 50 51 @property 52 def streams(self) -> Mapping[str, CachedDataset]: 53 return { 54 stream_name: CachedDataset(self._cache, stream_name) 55 for stream_name in self._processed_streams 56 } 57 58 @property 59 def cache(self) -> CacheBase: 60 return self._cache
A Mapping is a generic container for associating key/value pairs.
This class provides concrete generic implementations of all methods except for __getitem__, __iter__, and __len__.
Inherited Members
- collections.abc.Mapping
- get
- keys
- items
- values
15class SecretSourceEnum(str, Enum): 16 ENV = "env" 17 DOTENV = "dotenv" 18 GOOGLE_COLAB = "google_colab" 19 GOOGLE_GSM = "google_gsm" # Not enabled by default 20 21 PROMPT = "prompt"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
60class Source: # noqa: PLR0904 # Ignore max publish methods 61 """A class representing a source that can be called.""" 62 63 def __init__( 64 self, 65 executor: Executor, 66 name: str, 67 config: dict[str, Any] | None = None, 68 streams: str | list[str] | None = None, 69 *, 70 validate: bool = False, 71 ) -> None: 72 """Initialize the source. 73 74 If config is provided, it will be validated against the spec if validate is True. 75 """ 76 self.executor = executor 77 self.name = name 78 self._processed_records = 0 79 self._config_dict: dict[str, Any] | None = None 80 self._last_log_messages: list[str] = [] 81 self._discovered_catalog: AirbyteCatalog | None = None 82 self._spec: ConnectorSpecification | None = None 83 self._selected_stream_names: list[str] = [] 84 if config is not None: 85 self.set_config(config, validate=validate) 86 if streams is not None: 87 self.select_streams(streams) 88 89 self._deployed_api_root: str | None = None 90 self._deployed_workspace_id: str | None = None 91 self._deployed_source_id: str | None = None 92 93 def set_streams(self, streams: list[str]) -> None: 94 """Deprecated. See select_streams().""" 95 warnings.warn( 96 "The 'set_streams' method is deprecated and will be removed in a future version. " 97 "Please use the 'select_streams' method instead.", 98 DeprecationWarning, 99 stacklevel=2, 100 ) 101 self.select_streams(streams) 102 103 def select_all_streams(self) -> None: 104 """Select all streams. 105 106 This is a more streamlined equivalent to: 107 > source.select_streams(source.get_available_streams()). 108 """ 109 self._selected_stream_names = self.get_available_streams() 110 111 def select_streams(self, streams: str | list[str]) -> None: 112 """Select the stream names that should be read from the connector. 113 114 Args: 115 - streams: A list of stream names to select. If set to "*", all streams will be selected. 116 117 Currently, if this is not set, all streams will be read. 118 """ 119 if streams == "*": 120 self.select_all_streams() 121 return 122 123 if isinstance(streams, str): 124 # If a single stream is provided, convert it to a one-item list 125 streams = [streams] 126 127 available_streams = self.get_available_streams() 128 for stream in streams: 129 if stream not in available_streams: 130 raise exc.AirbyteStreamNotFoundError( 131 stream_name=stream, 132 connector_name=self.name, 133 available_streams=available_streams, 134 ) 135 self._selected_stream_names = streams 136 137 def get_selected_streams(self) -> list[str]: 138 """Get the selected streams. 139 140 If no streams are selected, return an empty list. 141 """ 142 return self._selected_stream_names 143 144 def set_config( 145 self, 146 config: dict[str, Any], 147 *, 148 validate: bool = True, 149 ) -> None: 150 """Set the config for the connector. 151 152 If validate is True, raise an exception if the config fails validation. 153 154 If validate is False, validation will be deferred until check() or validate_config() 155 is called. 156 """ 157 if validate: 158 self.validate_config(config) 159 160 self._config_dict = config 161 162 def get_config(self) -> dict[str, Any]: 163 """Get the config for the connector.""" 164 return self._config 165 166 @property 167 def _config(self) -> dict[str, Any]: 168 if self._config_dict is None: 169 raise exc.AirbyteConnectorConfigurationMissingError( 170 guidance="Provide via get_source() or set_config()" 171 ) 172 return self._config_dict 173 174 def _discover(self) -> AirbyteCatalog: 175 """Call discover on the connector. 176 177 This involves the following steps: 178 * Write the config to a temporary file 179 * execute the connector with discover --config <config_file> 180 * Listen to the messages and return the first AirbyteCatalog that comes along. 181 * Make sure the subprocess is killed when the function returns. 182 """ 183 with as_temp_files([self._config]) as [config_file]: 184 for msg in self._execute(["discover", "--config", config_file]): 185 if msg.type == Type.CATALOG and msg.catalog: 186 return msg.catalog 187 raise exc.AirbyteConnectorMissingCatalogError( 188 log_text=self._last_log_messages, 189 ) 190 191 def validate_config(self, config: dict[str, Any] | None = None) -> None: 192 """Validate the config against the spec. 193 194 If config is not provided, the already-set config will be validated. 195 """ 196 spec = self._get_spec(force_refresh=False) 197 config = self._config if config is None else config 198 try: 199 jsonschema.validate(config, spec.connectionSpecification) 200 log_config_validation_result( 201 name=self.name, 202 state=EventState.SUCCEEDED, 203 ) 204 except jsonschema.ValidationError as ex: 205 validation_ex = exc.AirbyteConnectorValidationFailedError( 206 message="The provided config is not valid.", 207 context={ 208 "error_message": ex.message, 209 "error_path": ex.path, 210 "error_instance": ex.instance, 211 "error_schema": ex.schema, 212 }, 213 ) 214 log_config_validation_result( 215 name=self.name, 216 state=EventState.FAILED, 217 exception=validation_ex, 218 ) 219 raise validation_ex from ex 220 221 def get_available_streams(self) -> list[str]: 222 """Get the available streams from the spec.""" 223 return [s.name for s in self.discovered_catalog.streams] 224 225 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 226 """Call spec on the connector. 227 228 This involves the following steps: 229 * execute the connector with spec 230 * Listen to the messages and return the first AirbyteCatalog that comes along. 231 * Make sure the subprocess is killed when the function returns. 232 """ 233 if force_refresh or self._spec is None: 234 for msg in self._execute(["spec"]): 235 if msg.type == Type.SPEC and msg.spec: 236 self._spec = msg.spec 237 break 238 239 if self._spec: 240 return self._spec 241 242 raise exc.AirbyteConnectorMissingSpecError( 243 log_text=self._last_log_messages, 244 ) 245 246 @property 247 def config_spec(self) -> dict[str, Any]: 248 """Generate a configuration spec for this connector, as a JSON Schema definition. 249 250 This function generates a JSON Schema dictionary with configuration specs for the 251 current connector, as a dictionary. 252 253 Returns: 254 dict: The JSON Schema configuration spec as a dictionary. 255 """ 256 return self._get_spec(force_refresh=True).connectionSpecification 257 258 def print_config_spec( 259 self, 260 format: Literal["yaml", "json"] = "yaml", # noqa: A002 261 *, 262 output_file: Path | str | None = None, 263 ) -> None: 264 """Print the configuration spec for this connector. 265 266 Args: 267 - format: The format to print the spec in. Must be "yaml" or "json". 268 - output_file: Optional. If set, the spec will be written to the given file path. Otherwise, 269 it will be printed to the console. 270 """ 271 if format not in {"yaml", "json"}: 272 raise exc.PyAirbyteInputError( 273 message="Invalid format. Expected 'yaml' or 'json'", 274 input_value=format, 275 ) 276 if isinstance(output_file, str): 277 output_file = Path(output_file) 278 279 if format == "yaml": 280 content = yaml.dump(self.config_spec, indent=2) 281 elif format == "json": 282 content = json.dumps(self.config_spec, indent=2) 283 284 if output_file: 285 output_file.write_text(content) 286 return 287 288 syntax_highlighted = Syntax(content, format) 289 print(syntax_highlighted) 290 291 @property 292 def _yaml_spec(self) -> str: 293 """Get the spec as a yaml string. 294 295 For now, the primary use case is for writing and debugging a valid config for a source. 296 297 This is private for now because we probably want better polish before exposing this 298 as a stable interface. This will also get easier when we have docs links with this info 299 for each connector. 300 """ 301 spec_obj: ConnectorSpecification = self._get_spec() 302 spec_dict = spec_obj.dict(exclude_unset=True) 303 # convert to a yaml string 304 return yaml.dump(spec_dict) 305 306 @property 307 def docs_url(self) -> str: 308 """Get the URL to the connector's documentation.""" 309 # TODO: Replace with docs URL from metadata when available 310 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 311 "source-", "" 312 ) 313 314 @property 315 def discovered_catalog(self) -> AirbyteCatalog: 316 """Get the raw catalog for the given streams. 317 318 If the catalog is not yet known, we call discover to get it. 319 """ 320 if self._discovered_catalog is None: 321 self._discovered_catalog = self._discover() 322 323 return self._discovered_catalog 324 325 @property 326 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 327 """Get the configured catalog for the given streams. 328 329 If the raw catalog is not yet known, we call discover to get it. 330 331 If no specific streams are selected, we return a catalog that syncs all available streams. 332 333 TODO: We should consider disabling by default the streams that the connector would 334 disable by default. (For instance, streams that require a premium license are sometimes 335 disabled by default within the connector.) 336 """ 337 # Ensure discovered catalog is cached before we start 338 _ = self.discovered_catalog 339 340 # Filter for selected streams if set, otherwise use all available streams: 341 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 342 343 return ConfiguredAirbyteCatalog( 344 streams=[ 345 ConfiguredAirbyteStream( 346 stream=stream, 347 destination_sync_mode=DestinationSyncMode.overwrite, 348 primary_key=stream.source_defined_primary_key, 349 # TODO: The below assumes all sources can coalesce from incremental sync to 350 # full_table as needed. CDK supports this, so it might be safe: 351 sync_mode=SyncMode.incremental, 352 ) 353 for stream in self.discovered_catalog.streams 354 if stream.name in streams_filter 355 ], 356 ) 357 358 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 359 """Return the JSON Schema spec for the specified stream name.""" 360 catalog: AirbyteCatalog = self.discovered_catalog 361 found: list[AirbyteStream] = [ 362 stream for stream in catalog.streams if stream.name == stream_name 363 ] 364 365 if len(found) == 0: 366 raise exc.PyAirbyteInputError( 367 message="Stream name does not exist in catalog.", 368 input_value=stream_name, 369 ) 370 371 if len(found) > 1: 372 raise exc.PyAirbyteInternalError( 373 message="Duplicate streams found with the same name.", 374 context={ 375 "found_streams": found, 376 }, 377 ) 378 379 return found[0].json_schema 380 381 def get_records(self, stream: str) -> LazyDataset: 382 """Read a stream from the connector. 383 384 This involves the following steps: 385 * Call discover to get the catalog 386 * Generate a configured catalog that syncs the given stream in full_refresh mode 387 * Write the configured catalog and the config to a temporary file 388 * execute the connector with read --config <config_file> --catalog <catalog_file> 389 * Listen to the messages and return the first AirbyteRecordMessages that come along. 390 * Make sure the subprocess is killed when the function returns. 391 """ 392 discovered_catalog: AirbyteCatalog = self.discovered_catalog 393 configured_catalog = ConfiguredAirbyteCatalog( 394 streams=[ 395 ConfiguredAirbyteStream( 396 stream=s, 397 sync_mode=SyncMode.full_refresh, 398 destination_sync_mode=DestinationSyncMode.overwrite, 399 ) 400 for s in discovered_catalog.streams 401 if s.name == stream 402 ], 403 ) 404 if len(configured_catalog.streams) == 0: 405 raise exc.PyAirbyteInputError( 406 message="Requested stream does not exist.", 407 context={ 408 "stream": stream, 409 "available_streams": self.get_available_streams(), 410 "connector_name": self.name, 411 }, 412 ) from KeyError(stream) 413 414 configured_stream = configured_catalog.streams[0] 415 all_properties = cast( 416 list[str], list(configured_stream.stream.json_schema["properties"].keys()) 417 ) 418 419 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 420 self._log_sync_start(cache=None) 421 yield from records 422 self._log_sync_success(cache=None) 423 424 iterator: Iterator[dict[str, Any]] = _with_logging( 425 records=( # Generator comprehension yields StreamRecord objects for each record 426 StreamRecord.from_record_message( 427 record_message=record.record, 428 expected_keys=all_properties, 429 prune_extra_fields=True, 430 ) 431 for record in self._read_with_catalog(configured_catalog) 432 if record.record 433 ) 434 ) 435 return LazyDataset( 436 iterator, 437 stream_metadata=configured_stream, 438 ) 439 440 @property 441 def connector_version(self) -> str | None: 442 """Return the version of the connector as reported by the executor. 443 444 Returns None if the version cannot be determined. 445 """ 446 return self.executor.get_installed_version() 447 448 def get_documents( 449 self, 450 stream: str, 451 title_property: str | None = None, 452 content_properties: list[str] | None = None, 453 metadata_properties: list[str] | None = None, 454 *, 455 render_metadata: bool = False, 456 ) -> Iterable[Document]: 457 """Read a stream from the connector and return the records as documents. 458 459 If metadata_properties is not set, all properties that are not content will be added to 460 the metadata. 461 462 If render_metadata is True, metadata will be rendered in the document, as well as the 463 the main content. 464 """ 465 return self.get_records(stream).to_documents( 466 title_property=title_property, 467 content_properties=content_properties, 468 metadata_properties=metadata_properties, 469 render_metadata=render_metadata, 470 ) 471 472 def check(self) -> None: 473 """Call check on the connector. 474 475 This involves the following steps: 476 * Write the config to a temporary file 477 * execute the connector with check --config <config_file> 478 * Listen to the messages and return the first AirbyteCatalog that comes along. 479 * Make sure the subprocess is killed when the function returns. 480 """ 481 with as_temp_files([self._config]) as [config_file]: 482 try: 483 for msg in self._execute(["check", "--config", config_file]): 484 if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: 485 if msg.connectionStatus.status != Status.FAILED: 486 print(f"Connection check succeeded for `{self.name}`.") 487 log_source_check_result( 488 name=self.name, 489 state=EventState.SUCCEEDED, 490 ) 491 return 492 493 log_source_check_result( 494 name=self.name, 495 state=EventState.FAILED, 496 ) 497 raise exc.AirbyteConnectorCheckFailedError( 498 help_url=self.docs_url, 499 context={ 500 "failure_reason": msg.connectionStatus.message, 501 }, 502 ) 503 raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) 504 except exc.AirbyteConnectorReadError as ex: 505 raise exc.AirbyteConnectorCheckFailedError( 506 message="The connector failed to check the connection.", 507 log_text=ex.log_text, 508 ) from ex 509 510 def install(self) -> None: 511 """Install the connector if it is not yet installed.""" 512 self.executor.install() 513 print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n") 514 515 def uninstall(self) -> None: 516 """Uninstall the connector if it is installed. 517 518 This only works if the use_local_install flag wasn't used and installation is managed by 519 PyAirbyte. 520 """ 521 self.executor.uninstall() 522 523 def _read_with_catalog( 524 self, 525 catalog: ConfiguredAirbyteCatalog, 526 state: StateProviderBase | None = None, 527 ) -> Iterator[AirbyteMessage]: 528 """Call read on the connector. 529 530 This involves the following steps: 531 * Write the config to a temporary file 532 * execute the connector with read --config <config_file> --catalog <catalog_file> 533 * Listen to the messages and return the AirbyteRecordMessages that come along. 534 * Send out telemetry on the performed sync (with information about which source was used and 535 the type of the cache) 536 """ 537 self._processed_records = 0 # Reset the counter before we start 538 with as_temp_files( 539 [ 540 self._config, 541 catalog.json(), 542 state.to_state_input_file_text() if state else "[]", 543 ] 544 ) as [ 545 config_file, 546 catalog_file, 547 state_file, 548 ]: 549 yield from self._tally_records( 550 self._execute( 551 [ 552 "read", 553 "--config", 554 config_file, 555 "--catalog", 556 catalog_file, 557 "--state", 558 state_file, 559 ], 560 ) 561 ) 562 563 def _add_to_logs(self, message: str) -> None: 564 self._last_log_messages.append(message) 565 self._last_log_messages = self._last_log_messages[-10:] 566 567 def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: 568 """Execute the connector with the given arguments. 569 570 This involves the following steps: 571 * Locate the right venv. It is called ".venv-<connector_name>" 572 * Spawn a subprocess with .venv-<connector_name>/bin/<connector-name> <args> 573 * Read the output line by line of the subprocess and serialize them AirbyteMessage objects. 574 Drop if not valid. 575 """ 576 # Fail early if the connector is not installed. 577 self.executor.ensure_installation(auto_fix=False) 578 579 try: 580 self._last_log_messages = [] 581 for line in self.executor.execute(args): 582 try: 583 message = AirbyteMessage.parse_raw(line) 584 if message.type is Type.RECORD: 585 self._processed_records += 1 586 if message.type == Type.LOG: 587 self._add_to_logs(message.log.message) 588 if message.type == Type.TRACE and message.trace.type == TraceType.ERROR: 589 self._add_to_logs(message.trace.error.message) 590 yield message 591 except Exception: 592 self._add_to_logs(line) 593 except Exception as e: 594 raise exc.AirbyteConnectorReadError( 595 log_text=self._last_log_messages, 596 ) from e 597 598 def _tally_records( 599 self, 600 messages: Iterable[AirbyteMessage], 601 ) -> Generator[AirbyteMessage, Any, None]: 602 """This method simply tallies the number of records processed and yields the messages.""" 603 self._processed_records = 0 # Reset the counter before we start 604 progress.reset(len(self._selected_stream_names or [])) 605 606 for message in messages: 607 yield message 608 progress.log_records_read(new_total_count=self._processed_records) 609 610 def _log_sync_start( 611 self, 612 *, 613 cache: CacheBase | None, 614 ) -> None: 615 """Log the start of a sync operation.""" 616 print(f"Started `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}...") 617 send_telemetry( 618 source=self, 619 cache=cache, 620 state=EventState.STARTED, 621 event_type=EventType.SYNC, 622 ) 623 624 def _log_sync_success( 625 self, 626 *, 627 cache: CacheBase | None, 628 ) -> None: 629 """Log the success of a sync operation.""" 630 print(f"Completed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.") 631 send_telemetry( 632 source=self, 633 cache=cache, 634 state=EventState.SUCCEEDED, 635 number_of_records=self._processed_records, 636 event_type=EventType.SYNC, 637 ) 638 639 def _log_sync_failure( 640 self, 641 *, 642 cache: CacheBase | None, 643 exception: Exception, 644 ) -> None: 645 """Log the failure of a sync operation.""" 646 print(f"Failed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.") 647 send_telemetry( 648 state=EventState.FAILED, 649 source=self, 650 cache=cache, 651 number_of_records=self._processed_records, 652 exception=exception, 653 event_type=EventType.SYNC, 654 ) 655 656 def read( 657 self, 658 cache: CacheBase | None = None, 659 *, 660 streams: str | list[str] | None = None, 661 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 662 force_full_refresh: bool = False, 663 skip_validation: bool = False, 664 ) -> ReadResult: 665 """Read from the connector and write to the cache. 666 667 Args: 668 cache: The cache to write to. If None, a default cache will be used. 669 write_strategy: The strategy to use when writing to the cache. If a string, it must be 670 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 671 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 672 WriteStrategy.AUTO. 673 streams: Optional if already set. A list of stream names to select for reading. If set 674 to "*", all streams will be selected. 675 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 676 streams will be read in incremental mode if supported by the connector. This option 677 must be True when using the "replace" strategy. 678 """ 679 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 680 warnings.warn( 681 message=( 682 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 683 "could result in data loss. " 684 "To silence this warning, use the following: " 685 'warnings.filterwarnings("ignore", ' 686 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 687 ), 688 category=PyAirbyteDataLossWarning, 689 stacklevel=1, 690 ) 691 if isinstance(write_strategy, str): 692 try: 693 write_strategy = WriteStrategy(write_strategy) 694 except ValueError: 695 raise exc.PyAirbyteInputError( 696 message="Invalid strategy", 697 context={ 698 "write_strategy": write_strategy, 699 "available_strategies": [s.value for s in WriteStrategy], 700 }, 701 ) from None 702 703 if streams: 704 self.select_streams(streams) 705 706 if not self._selected_stream_names: 707 raise exc.PyAirbyteNoStreamsSelectedError( 708 connector_name=self.name, 709 available_streams=self.get_available_streams(), 710 ) 711 712 # Run optional validation step 713 if not skip_validation: 714 self.validate_config() 715 716 # Set up cache and related resources 717 if cache is None: 718 cache = get_default_cache() 719 720 # Set up state provider if not in full refresh mode 721 if force_full_refresh: 722 state_provider: StateProviderBase | None = None 723 else: 724 state_provider = cache.get_state_provider( 725 source_name=self.name, 726 ) 727 728 self._log_sync_start(cache=cache) 729 730 cache_processor = cache.get_record_processor( 731 source_name=self.name, 732 catalog_provider=CatalogProvider(self.configured_catalog), 733 ) 734 try: 735 cache_processor.process_airbyte_messages( 736 self._read_with_catalog( 737 catalog=self.configured_catalog, 738 state=state_provider, 739 ), 740 write_strategy=write_strategy, 741 ) 742 743 # TODO: We should catch more specific exceptions here 744 except Exception as ex: 745 self._log_sync_failure(cache=cache, exception=ex) 746 raise exc.AirbyteConnectorFailedError( 747 log_text=self._last_log_messages, 748 ) from ex 749 750 self._log_sync_success(cache=cache) 751 return ReadResult( 752 processed_records=self._processed_records, 753 cache=cache, 754 processed_streams=[stream.stream.name for stream in self.configured_catalog.streams], 755 )
A class representing a source that can be called.
63 def __init__( 64 self, 65 executor: Executor, 66 name: str, 67 config: dict[str, Any] | None = None, 68 streams: str | list[str] | None = None, 69 *, 70 validate: bool = False, 71 ) -> None: 72 """Initialize the source. 73 74 If config is provided, it will be validated against the spec if validate is True. 75 """ 76 self.executor = executor 77 self.name = name 78 self._processed_records = 0 79 self._config_dict: dict[str, Any] | None = None 80 self._last_log_messages: list[str] = [] 81 self._discovered_catalog: AirbyteCatalog | None = None 82 self._spec: ConnectorSpecification | None = None 83 self._selected_stream_names: list[str] = [] 84 if config is not None: 85 self.set_config(config, validate=validate) 86 if streams is not None: 87 self.select_streams(streams) 88 89 self._deployed_api_root: str | None = None 90 self._deployed_workspace_id: str | None = None 91 self._deployed_source_id: str | None = None
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
93 def set_streams(self, streams: list[str]) -> None: 94 """Deprecated. See select_streams().""" 95 warnings.warn( 96 "The 'set_streams' method is deprecated and will be removed in a future version. " 97 "Please use the 'select_streams' method instead.", 98 DeprecationWarning, 99 stacklevel=2, 100 ) 101 self.select_streams(streams)
Deprecated. See select_streams().
103 def select_all_streams(self) -> None: 104 """Select all streams. 105 106 This is a more streamlined equivalent to: 107 > source.select_streams(source.get_available_streams()). 108 """ 109 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
111 def select_streams(self, streams: str | list[str]) -> None: 112 """Select the stream names that should be read from the connector. 113 114 Args: 115 - streams: A list of stream names to select. If set to "*", all streams will be selected. 116 117 Currently, if this is not set, all streams will be read. 118 """ 119 if streams == "*": 120 self.select_all_streams() 121 return 122 123 if isinstance(streams, str): 124 # If a single stream is provided, convert it to a one-item list 125 streams = [streams] 126 127 available_streams = self.get_available_streams() 128 for stream in streams: 129 if stream not in available_streams: 130 raise exc.AirbyteStreamNotFoundError( 131 stream_name=stream, 132 connector_name=self.name, 133 available_streams=available_streams, 134 ) 135 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Args:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
137 def get_selected_streams(self) -> list[str]: 138 """Get the selected streams. 139 140 If no streams are selected, return an empty list. 141 """ 142 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
144 def set_config( 145 self, 146 config: dict[str, Any], 147 *, 148 validate: bool = True, 149 ) -> None: 150 """Set the config for the connector. 151 152 If validate is True, raise an exception if the config fails validation. 153 154 If validate is False, validation will be deferred until check() or validate_config() 155 is called. 156 """ 157 if validate: 158 self.validate_config(config) 159 160 self._config_dict = config
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
162 def get_config(self) -> dict[str, Any]: 163 """Get the config for the connector.""" 164 return self._config
Get the config for the connector.
191 def validate_config(self, config: dict[str, Any] | None = None) -> None: 192 """Validate the config against the spec. 193 194 If config is not provided, the already-set config will be validated. 195 """ 196 spec = self._get_spec(force_refresh=False) 197 config = self._config if config is None else config 198 try: 199 jsonschema.validate(config, spec.connectionSpecification) 200 log_config_validation_result( 201 name=self.name, 202 state=EventState.SUCCEEDED, 203 ) 204 except jsonschema.ValidationError as ex: 205 validation_ex = exc.AirbyteConnectorValidationFailedError( 206 message="The provided config is not valid.", 207 context={ 208 "error_message": ex.message, 209 "error_path": ex.path, 210 "error_instance": ex.instance, 211 "error_schema": ex.schema, 212 }, 213 ) 214 log_config_validation_result( 215 name=self.name, 216 state=EventState.FAILED, 217 exception=validation_ex, 218 ) 219 raise validation_ex from ex
Validate the config against the spec.
If config is not provided, the already-set config will be validated.
221 def get_available_streams(self) -> list[str]: 222 """Get the available streams from the spec.""" 223 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
246 @property 247 def config_spec(self) -> dict[str, Any]: 248 """Generate a configuration spec for this connector, as a JSON Schema definition. 249 250 This function generates a JSON Schema dictionary with configuration specs for the 251 current connector, as a dictionary. 252 253 Returns: 254 dict: The JSON Schema configuration spec as a dictionary. 255 """ 256 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
258 def print_config_spec( 259 self, 260 format: Literal["yaml", "json"] = "yaml", # noqa: A002 261 *, 262 output_file: Path | str | None = None, 263 ) -> None: 264 """Print the configuration spec for this connector. 265 266 Args: 267 - format: The format to print the spec in. Must be "yaml" or "json". 268 - output_file: Optional. If set, the spec will be written to the given file path. Otherwise, 269 it will be printed to the console. 270 """ 271 if format not in {"yaml", "json"}: 272 raise exc.PyAirbyteInputError( 273 message="Invalid format. Expected 'yaml' or 'json'", 274 input_value=format, 275 ) 276 if isinstance(output_file, str): 277 output_file = Path(output_file) 278 279 if format == "yaml": 280 content = yaml.dump(self.config_spec, indent=2) 281 elif format == "json": 282 content = json.dumps(self.config_spec, indent=2) 283 284 if output_file: 285 output_file.write_text(content) 286 return 287 288 syntax_highlighted = Syntax(content, format) 289 print(syntax_highlighted)
Print the configuration spec for this connector.
Args:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
306 @property 307 def docs_url(self) -> str: 308 """Get the URL to the connector's documentation.""" 309 # TODO: Replace with docs URL from metadata when available 310 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 311 "source-", "" 312 )
Get the URL to the connector's documentation.
314 @property 315 def discovered_catalog(self) -> AirbyteCatalog: 316 """Get the raw catalog for the given streams. 317 318 If the catalog is not yet known, we call discover to get it. 319 """ 320 if self._discovered_catalog is None: 321 self._discovered_catalog = self._discover() 322 323 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
325 @property 326 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 327 """Get the configured catalog for the given streams. 328 329 If the raw catalog is not yet known, we call discover to get it. 330 331 If no specific streams are selected, we return a catalog that syncs all available streams. 332 333 TODO: We should consider disabling by default the streams that the connector would 334 disable by default. (For instance, streams that require a premium license are sometimes 335 disabled by default within the connector.) 336 """ 337 # Ensure discovered catalog is cached before we start 338 _ = self.discovered_catalog 339 340 # Filter for selected streams if set, otherwise use all available streams: 341 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 342 343 return ConfiguredAirbyteCatalog( 344 streams=[ 345 ConfiguredAirbyteStream( 346 stream=stream, 347 destination_sync_mode=DestinationSyncMode.overwrite, 348 primary_key=stream.source_defined_primary_key, 349 # TODO: The below assumes all sources can coalesce from incremental sync to 350 # full_table as needed. CDK supports this, so it might be safe: 351 sync_mode=SyncMode.incremental, 352 ) 353 for stream in self.discovered_catalog.streams 354 if stream.name in streams_filter 355 ], 356 )
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
358 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 359 """Return the JSON Schema spec for the specified stream name.""" 360 catalog: AirbyteCatalog = self.discovered_catalog 361 found: list[AirbyteStream] = [ 362 stream for stream in catalog.streams if stream.name == stream_name 363 ] 364 365 if len(found) == 0: 366 raise exc.PyAirbyteInputError( 367 message="Stream name does not exist in catalog.", 368 input_value=stream_name, 369 ) 370 371 if len(found) > 1: 372 raise exc.PyAirbyteInternalError( 373 message="Duplicate streams found with the same name.", 374 context={ 375 "found_streams": found, 376 }, 377 ) 378 379 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
381 def get_records(self, stream: str) -> LazyDataset: 382 """Read a stream from the connector. 383 384 This involves the following steps: 385 * Call discover to get the catalog 386 * Generate a configured catalog that syncs the given stream in full_refresh mode 387 * Write the configured catalog and the config to a temporary file 388 * execute the connector with read --config <config_file> --catalog <catalog_file> 389 * Listen to the messages and return the first AirbyteRecordMessages that come along. 390 * Make sure the subprocess is killed when the function returns. 391 """ 392 discovered_catalog: AirbyteCatalog = self.discovered_catalog 393 configured_catalog = ConfiguredAirbyteCatalog( 394 streams=[ 395 ConfiguredAirbyteStream( 396 stream=s, 397 sync_mode=SyncMode.full_refresh, 398 destination_sync_mode=DestinationSyncMode.overwrite, 399 ) 400 for s in discovered_catalog.streams 401 if s.name == stream 402 ], 403 ) 404 if len(configured_catalog.streams) == 0: 405 raise exc.PyAirbyteInputError( 406 message="Requested stream does not exist.", 407 context={ 408 "stream": stream, 409 "available_streams": self.get_available_streams(), 410 "connector_name": self.name, 411 }, 412 ) from KeyError(stream) 413 414 configured_stream = configured_catalog.streams[0] 415 all_properties = cast( 416 list[str], list(configured_stream.stream.json_schema["properties"].keys()) 417 ) 418 419 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 420 self._log_sync_start(cache=None) 421 yield from records 422 self._log_sync_success(cache=None) 423 424 iterator: Iterator[dict[str, Any]] = _with_logging( 425 records=( # Generator comprehension yields StreamRecord objects for each record 426 StreamRecord.from_record_message( 427 record_message=record.record, 428 expected_keys=all_properties, 429 prune_extra_fields=True, 430 ) 431 for record in self._read_with_catalog(configured_catalog) 432 if record.record 433 ) 434 ) 435 return LazyDataset( 436 iterator, 437 stream_metadata=configured_stream, 438 )
Read a stream from the connector.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
440 @property 441 def connector_version(self) -> str | None: 442 """Return the version of the connector as reported by the executor. 443 444 Returns None if the version cannot be determined. 445 """ 446 return self.executor.get_installed_version()
Return the version of the connector as reported by the executor.
Returns None if the version cannot be determined.
448 def get_documents( 449 self, 450 stream: str, 451 title_property: str | None = None, 452 content_properties: list[str] | None = None, 453 metadata_properties: list[str] | None = None, 454 *, 455 render_metadata: bool = False, 456 ) -> Iterable[Document]: 457 """Read a stream from the connector and return the records as documents. 458 459 If metadata_properties is not set, all properties that are not content will be added to 460 the metadata. 461 462 If render_metadata is True, metadata will be rendered in the document, as well as the 463 the main content. 464 """ 465 return self.get_records(stream).to_documents( 466 title_property=title_property, 467 content_properties=content_properties, 468 metadata_properties=metadata_properties, 469 render_metadata=render_metadata, 470 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
472 def check(self) -> None: 473 """Call check on the connector. 474 475 This involves the following steps: 476 * Write the config to a temporary file 477 * execute the connector with check --config <config_file> 478 * Listen to the messages and return the first AirbyteCatalog that comes along. 479 * Make sure the subprocess is killed when the function returns. 480 """ 481 with as_temp_files([self._config]) as [config_file]: 482 try: 483 for msg in self._execute(["check", "--config", config_file]): 484 if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: 485 if msg.connectionStatus.status != Status.FAILED: 486 print(f"Connection check succeeded for `{self.name}`.") 487 log_source_check_result( 488 name=self.name, 489 state=EventState.SUCCEEDED, 490 ) 491 return 492 493 log_source_check_result( 494 name=self.name, 495 state=EventState.FAILED, 496 ) 497 raise exc.AirbyteConnectorCheckFailedError( 498 help_url=self.docs_url, 499 context={ 500 "failure_reason": msg.connectionStatus.message, 501 }, 502 ) 503 raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) 504 except exc.AirbyteConnectorReadError as ex: 505 raise exc.AirbyteConnectorCheckFailedError( 506 message="The connector failed to check the connection.", 507 log_text=ex.log_text, 508 ) from ex
Call check on the connector.
This involves the following steps:
- Write the config to a temporary file
- execute the connector with check --config
- Listen to the messages and return the first AirbyteCatalog that comes along.
- Make sure the subprocess is killed when the function returns.
510 def install(self) -> None: 511 """Install the connector if it is not yet installed.""" 512 self.executor.install() 513 print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n")
Install the connector if it is not yet installed.
515 def uninstall(self) -> None: 516 """Uninstall the connector if it is installed. 517 518 This only works if the use_local_install flag wasn't used and installation is managed by 519 PyAirbyte. 520 """ 521 self.executor.uninstall()
Uninstall the connector if it is installed.
This only works if the use_local_install flag wasn't used and installation is managed by PyAirbyte.
656 def read( 657 self, 658 cache: CacheBase | None = None, 659 *, 660 streams: str | list[str] | None = None, 661 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 662 force_full_refresh: bool = False, 663 skip_validation: bool = False, 664 ) -> ReadResult: 665 """Read from the connector and write to the cache. 666 667 Args: 668 cache: The cache to write to. If None, a default cache will be used. 669 write_strategy: The strategy to use when writing to the cache. If a string, it must be 670 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 671 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 672 WriteStrategy.AUTO. 673 streams: Optional if already set. A list of stream names to select for reading. If set 674 to "*", all streams will be selected. 675 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 676 streams will be read in incremental mode if supported by the connector. This option 677 must be True when using the "replace" strategy. 678 """ 679 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 680 warnings.warn( 681 message=( 682 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 683 "could result in data loss. " 684 "To silence this warning, use the following: " 685 'warnings.filterwarnings("ignore", ' 686 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 687 ), 688 category=PyAirbyteDataLossWarning, 689 stacklevel=1, 690 ) 691 if isinstance(write_strategy, str): 692 try: 693 write_strategy = WriteStrategy(write_strategy) 694 except ValueError: 695 raise exc.PyAirbyteInputError( 696 message="Invalid strategy", 697 context={ 698 "write_strategy": write_strategy, 699 "available_strategies": [s.value for s in WriteStrategy], 700 }, 701 ) from None 702 703 if streams: 704 self.select_streams(streams) 705 706 if not self._selected_stream_names: 707 raise exc.PyAirbyteNoStreamsSelectedError( 708 connector_name=self.name, 709 available_streams=self.get_available_streams(), 710 ) 711 712 # Run optional validation step 713 if not skip_validation: 714 self.validate_config() 715 716 # Set up cache and related resources 717 if cache is None: 718 cache = get_default_cache() 719 720 # Set up state provider if not in full refresh mode 721 if force_full_refresh: 722 state_provider: StateProviderBase | None = None 723 else: 724 state_provider = cache.get_state_provider( 725 source_name=self.name, 726 ) 727 728 self._log_sync_start(cache=cache) 729 730 cache_processor = cache.get_record_processor( 731 source_name=self.name, 732 catalog_provider=CatalogProvider(self.configured_catalog), 733 ) 734 try: 735 cache_processor.process_airbyte_messages( 736 self._read_with_catalog( 737 catalog=self.configured_catalog, 738 state=state_provider, 739 ), 740 write_strategy=write_strategy, 741 ) 742 743 # TODO: We should catch more specific exceptions here 744 except Exception as ex: 745 self._log_sync_failure(cache=cache, exception=ex) 746 raise exc.AirbyteConnectorFailedError( 747 log_text=self._last_log_messages, 748 ) from ex 749 750 self._log_sync_success(cache=cache) 751 return ReadResult( 752 processed_records=self._processed_records, 753 cache=cache, 754 processed_streams=[stream.stream.name for stream in self.configured_catalog.streams], 755 )
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If None, a default cache will be used.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
93class StreamRecord(dict[str, Any]): 94 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 95 96 It has these behaviors: 97 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 98 case-insensitive manner. 99 - The original case is stored in a separate dictionary, so that the original case can be 100 retrieved when needed. 101 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 102 Python dictionary. 103 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 104 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 105 106 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 107 references. 108 109 There are two ways this class can store keys internally: 110 - If normalize_keys is True, the keys are normalized using the given normalizer. 111 - If normalize_keys is False, the original case of the keys is stored. 112 113 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 114 dictionary will be initialized with the given keys. If a key is not found in the input data, it 115 will be initialized with a value of None. When provided, the 'expected_keys' input will also 116 determine the original case of the keys. 117 """ 118 119 def _display_case(self, key: str) -> str: 120 """Return the original case of the key.""" 121 return self._pretty_case_keys[self._normalizer.normalize(key)] 122 123 def _index_case(self, key: str) -> str: 124 """Return the internal case of the key. 125 126 If normalize_keys is True, return the normalized key. 127 Otherwise, return the original case of the key. 128 """ 129 if self._normalize_keys: 130 return self._normalizer.normalize(key) 131 132 return self._display_case(key) 133 134 @classmethod 135 def from_record_message( 136 cls, 137 record_message: AirbyteRecordMessage, 138 *, 139 prune_extra_fields: bool, 140 normalize_keys: bool = True, 141 normalizer: type[NameNormalizerBase] | None = None, 142 expected_keys: list[str] | None = None, 143 ) -> StreamRecord: 144 """Return a StreamRecord from a RecordMessage.""" 145 data_dict: dict[str, Any] = record_message.data.copy() 146 data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID()) 147 data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp( 148 record_message.emitted_at / 1000, tz=pytz.utc 149 ) 150 data_dict[AB_META_COLUMN] = {} 151 152 return cls( 153 from_dict=data_dict, 154 prune_extra_fields=prune_extra_fields, 155 normalize_keys=normalize_keys, 156 normalizer=normalizer, 157 expected_keys=expected_keys, 158 ) 159 160 def __init__( 161 self, 162 from_dict: dict, 163 *, 164 prune_extra_fields: bool, 165 normalize_keys: bool = True, 166 normalizer: type[NameNormalizerBase] | None = None, 167 expected_keys: list[str] | None = None, 168 ) -> None: 169 """Initialize the dictionary with the given data. 170 171 Args: 172 from_dict: The dictionary to initialize the StreamRecord with. 173 prune_extra_fields: If `True`, unexpected fields will be removed. 174 normalize_keys: If `True`, the keys will be normalized using the given normalizer. 175 normalizer: The normalizer to use when normalizing keys. If not provided, the 176 LowerCaseNormalizer will be used. 177 expected_keys: If provided and `prune_extra_fields` is True, then unexpected fields 178 will be removed. This option is ignored if `expected_keys` is not provided. 179 """ 180 # If no normalizer is provided, use LowerCaseNormalizer. 181 self._normalize_keys = normalize_keys 182 self._normalizer: type[NameNormalizerBase] = normalizer or LowerCaseNormalizer 183 184 # If no expected keys are provided, use all keys from the input dictionary. 185 if not expected_keys: 186 expected_keys = list(from_dict.keys()) 187 prune_extra_fields = False # No expected keys provided. 188 else: 189 expected_keys = list(expected_keys) 190 191 for internal_col in AB_INTERNAL_COLUMNS: 192 if internal_col not in expected_keys: 193 expected_keys.append(internal_col) 194 195 # Store a lookup from normalized keys to pretty cased (originally cased) keys. 196 self._pretty_case_keys: dict[str, str] = { 197 self._normalizer.normalize(pretty_case.lower()): pretty_case 198 for pretty_case in expected_keys 199 } 200 201 if normalize_keys: 202 index_keys = [self._normalizer.normalize(key) for key in expected_keys] 203 else: 204 index_keys = expected_keys 205 206 self.update(dict.fromkeys(index_keys)) # Start by initializing all values to None 207 for k, v in from_dict.items(): 208 index_cased_key = self._index_case(k) 209 if prune_extra_fields and index_cased_key not in index_keys: 210 # Dropping undeclared field 211 continue 212 213 self[index_cased_key] = v 214 215 def __getitem__(self, key: str) -> Any: # noqa: ANN401 216 if super().__contains__(key): 217 return super().__getitem__(key) 218 219 if super().__contains__(self._index_case(key)): 220 return super().__getitem__(self._index_case(key)) 221 222 raise KeyError(key) 223 224 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 225 if super().__contains__(key): 226 super().__setitem__(key, value) 227 return 228 229 if super().__contains__(self._index_case(key)): 230 super().__setitem__(self._index_case(key), value) 231 return 232 233 # Store the pretty cased (originally cased) key: 234 self._pretty_case_keys[self._normalizer.normalize(key)] = key 235 236 # Store the data with the normalized key: 237 super().__setitem__(self._index_case(key), value) 238 239 def __delitem__(self, key: str) -> None: 240 if super().__contains__(key): 241 super().__delitem__(key) 242 return 243 244 if super().__contains__(self._index_case(key)): 245 super().__delitem__(self._index_case(key)) 246 return 247 248 raise KeyError(key) 249 250 def __contains__(self, key: object) -> bool: 251 assert isinstance(key, str), "Key must be a string." 252 return super().__contains__(key) or super().__contains__(self._index_case(key)) 253 254 def __iter__(self) -> Any: # noqa: ANN401 255 return iter(super().__iter__()) 256 257 def __len__(self) -> int: 258 return super().__len__() 259 260 def __eq__(self, other: object) -> bool: 261 if isinstance(other, StreamRecord): 262 return dict(self) == dict(other) 263 264 if isinstance(other, dict): 265 return {k.lower(): v for k, v in self.items()} == { 266 k.lower(): v for k, v in other.items() 267 } 268 return False 269 270 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 271 """Return the hash of the dictionary with keys sorted.""" 272 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 273 return hash(tuple(sorted(items)))
The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
It has these behaviors:
- When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
- The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
- Because it is subclassed from
dict
, theStreamRecord
class can be passed as a normal Python dictionary. - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
metadata columns:
_airbyte_raw_id
,_airbyte_extracted_at
, and_airbyte_meta
.
This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.
There are two ways this class can store keys internally:
- If normalize_keys is True, the keys are normalized using the given normalizer.
- If normalize_keys is False, the original case of the keys is stored.
In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.
134 @classmethod 135 def from_record_message( 136 cls, 137 record_message: AirbyteRecordMessage, 138 *, 139 prune_extra_fields: bool, 140 normalize_keys: bool = True, 141 normalizer: type[NameNormalizerBase] | None = None, 142 expected_keys: list[str] | None = None, 143 ) -> StreamRecord: 144 """Return a StreamRecord from a RecordMessage.""" 145 data_dict: dict[str, Any] = record_message.data.copy() 146 data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID()) 147 data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp( 148 record_message.emitted_at / 1000, tz=pytz.utc 149 ) 150 data_dict[AB_META_COLUMN] = {} 151 152 return cls( 153 from_dict=data_dict, 154 prune_extra_fields=prune_extra_fields, 155 normalize_keys=normalize_keys, 156 normalizer=normalizer, 157 expected_keys=expected_keys, 158 )
Return a StreamRecord from a RecordMessage.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy