airbyte.sources
Sources connectors module for PyAirbyte.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Sources connectors module for PyAirbyte.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING 7 8from airbyte.sources.base import Source 9from airbyte.sources.registry import ( 10 ConnectorMetadata, 11 get_available_connectors, 12 get_connector_metadata, 13) 14from airbyte.sources.util import ( 15 get_benchmark_source, 16 get_source, 17) 18 19 20# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 21if TYPE_CHECKING: 22 # ruff: noqa: TC004 # imports used for more than type checking 23 from airbyte.sources import ( 24 base, 25 registry, 26 util, 27 ) 28 29__all__ = [ 30 # Submodules 31 "base", 32 "registry", 33 "util", 34 # Factories 35 "get_source", 36 "get_benchmark_source", 37 # Helper Functions 38 "get_available_connectors", 39 "get_connector_metadata", 40 # Classes 41 "Source", 42 "ConnectorMetadata", 43]
48def get_source( # noqa: PLR0913 # Too many arguments 49 name: str, 50 config: dict[str, Any] | None = None, 51 *, 52 config_change_callback: ConfigChangeCallback | None = None, 53 streams: str | list[str] | None = None, 54 version: str | None = None, 55 pip_url: str | None = None, 56 local_executable: Path | str | None = None, 57 docker_image: bool | str | None = None, 58 use_host_network: bool = False, 59 source_manifest: bool | dict | Path | str | None = None, 60 install_if_missing: bool = True, 61 install_root: Path | None = None, 62) -> Source: 63 """Get a connector by name and version. 64 65 If an explicit install or execution method is requested (e.g. `local_executable`, 66 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 67 68 Otherwise, an appropriate method will be selected based on the available connector metadata: 69 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 70 will be downloaded and used to to execute the connector. 71 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 72 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 73 will be executed using Docker. 74 75 Args: 76 name: connector name 77 config: connector config - if not provided, you need to set it later via the set_config 78 method. 79 config_change_callback: callback function to be called when the connector config changes. 80 streams: list of stream names to select for reading. If set to "*", all streams will be 81 selected. If not provided, you can set it later via the `select_streams()` or 82 `select_all_streams()` method. 83 version: connector version - if not provided, the currently installed version will be used. 84 If no version is installed, the latest available version will be used. The version can 85 also be set to "latest" to force the use of the latest available version. 86 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 87 connector name. 88 local_executable: If set, the connector will be assumed to already be installed and will be 89 executed using this path or executable name. Otherwise, the connector will be installed 90 automatically in a virtual environment. 91 docker_image: If set, the connector will be executed using Docker. You can specify `True` 92 to use the default image for the connector, or you can specify a custom image name. 93 If `version` is specified and your image name does not already contain a tag 94 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 95 use_host_network: If set, along with docker_image, the connector will be executed using 96 the host network. This is useful for connectors that need to access resources on 97 the host machine, such as a local database. This parameter is ignored when 98 `docker_image` is not set. 99 source_manifest: If set, the connector will be executed based on a declarative YAML 100 source definition. This input can be `True` to attempt to auto-download a YAML spec, 101 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 102 the local file system, or `str` to pull the definition from a web URL. 103 install_if_missing: Whether to install the connector if it is not available locally. This 104 parameter is ignored when `local_executable` or `source_manifest` are set. 105 install_root: (Optional.) The root directory where the virtual environment will be 106 created. If not provided, the current working directory will be used. 107 """ 108 return Source( 109 name=name, 110 config=config, 111 config_change_callback=config_change_callback, 112 streams=streams, 113 executor=get_connector_executor( 114 name=name, 115 version=version, 116 pip_url=pip_url, 117 local_executable=local_executable, 118 docker_image=docker_image, 119 use_host_network=use_host_network, 120 source_manifest=source_manifest, 121 install_if_missing=install_if_missing, 122 install_root=install_root, 123 ), 124 )
Get a connector by name and version.
If an explicit install or execution method is requested (e.g. local_executable
,
docker_image
, pip_url
, source_manifest
), the connector will be executed using this method.
Otherwise, an appropriate method will be selected based on the available connector metadata:
- If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
- Else, if the connector is registered and has a PyPI package, it will be installed via pip.
- Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_image
is not set. - source_manifest: If set, the connector will be executed based on a declarative YAML
source definition. This input can be
True
to attempt to auto-download a YAML spec,dict
to accept a Python dictionary as the manifest,Path
to pull a manifest from the local file system, orstr
to pull the definition from a web URL. - install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when
local_executable
orsource_manifest
are set. - install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
127def get_benchmark_source( 128 num_records: int | str = "5e5", 129 *, 130 install_if_missing: bool = True, 131) -> Source: 132 """Get a source for benchmarking. 133 134 This source will generate dummy records for performance benchmarking purposes. 135 You can specify the number of records to generate using the `num_records` parameter. 136 The `num_records` parameter can be an integer or a string in scientific notation. 137 For example, `"5e6"` will generate 5 million records. If underscores are providing 138 within a numeric a string, they will be ignored. 139 140 Args: 141 num_records: The number of records to generate. Defaults to "5e5", or 142 500,000 records. 143 Can be an integer (`1000`) or a string in scientific notation. 144 For example, `"5e6"` will generate 5 million records. 145 install_if_missing: Whether to install the source if it is not available locally. 146 147 Returns: 148 Source: The source object for benchmarking. 149 """ 150 if isinstance(num_records, str): 151 try: 152 num_records = int(Decimal(num_records.replace("_", ""))) 153 except InvalidOperation as ex: 154 raise PyAirbyteInputError( 155 message="Invalid number format.", 156 original_exception=ex, 157 input_value=str(num_records), 158 ) from None 159 160 return get_source( 161 name="source-e2e-test", 162 docker_image=True, 163 # docker_image="airbyte/source-e2e-test:latest", 164 config={ 165 "type": "BENCHMARK", 166 "schema": "FIVE_STRING_COLUMNS", 167 "terminationCondition": { 168 "type": "MAX_RECORDS", 169 "max": num_records, 170 }, 171 }, 172 streams="*", 173 install_if_missing=install_if_missing, 174 )
Get a source for benchmarking.
This source will generate dummy records for performance benchmarking purposes.
You can specify the number of records to generate using the num_records
parameter.
The num_records
parameter can be an integer or a string in scientific notation.
For example, "5e6"
will generate 5 million records. If underscores are providing
within a numeric a string, they will be ignored.
Arguments:
- num_records: The number of records to generate. Defaults to "5e5", or
500,000 records.
Can be an integer (
1000
) or a string in scientific notation. For example,"5e6"
will generate 5 million records. - install_if_missing: Whether to install the source if it is not available locally.
Returns:
Source: The source object for benchmarking.
237def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 238 """Return a list of all available connectors. 239 240 Connectors will be returned in alphabetical order, with the standard prefix "source-". 241 """ 242 if install_type is None: 243 # No install type specified. Filter for whatever is runnable. 244 if is_docker_installed(): 245 logger.info("Docker is detected. Returning all connectors.") 246 # If Docker is available, return all connectors. 247 return sorted(conn.name for conn in _get_registry_cache().values()) 248 249 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 250 251 # If Docker is not available, return only Python and Manifest-based connectors. 252 return sorted( 253 conn.name 254 for conn in _get_registry_cache().values() 255 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 256 ) 257 258 if not isinstance(install_type, InstallType): 259 install_type = InstallType(install_type) 260 261 if install_type == InstallType.PYTHON: 262 return sorted( 263 conn.name 264 for conn in _get_registry_cache().values() 265 if conn.pypi_package_name is not None 266 ) 267 268 if install_type == InstallType.JAVA: 269 warnings.warn( 270 message="Java connectors are not yet supported.", 271 stacklevel=2, 272 ) 273 return sorted( 274 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 275 ) 276 277 if install_type == InstallType.DOCKER: 278 return sorted(conn.name for conn in _get_registry_cache().values()) 279 280 if install_type == InstallType.YAML: 281 return sorted( 282 conn.name 283 for conn in _get_registry_cache().values() 284 if InstallType.YAML in conn.install_types 285 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 286 ) 287 288 # pragma: no cover # Should never be reached. 289 raise exc.PyAirbyteInputError( 290 message="Invalid install type.", 291 context={ 292 "install_type": install_type, 293 }, 294 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".
207def get_connector_metadata(name: str) -> ConnectorMetadata | None: 208 """Check the cache for the connector. 209 210 If the cache is empty, populate by calling update_cache. 211 """ 212 registry_url = _get_registry_url() 213 214 if _is_registry_disabled(registry_url): 215 return None 216 217 cache = copy(_get_registry_cache()) 218 219 if not cache: 220 raise exc.PyAirbyteInternalError( 221 message="Connector registry could not be loaded.", 222 context={ 223 "registry_url": _get_registry_url(), 224 }, 225 ) 226 if name not in cache: 227 raise exc.AirbyteConnectorNotRegisteredError( 228 connector_name=name, 229 context={ 230 "registry_url": _get_registry_url(), 231 "available_connectors": get_available_connectors(), 232 }, 233 ) 234 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
53class Source(ConnectorBase): # noqa: PLR0904 54 """A class representing a source that can be called.""" 55 56 connector_type = "source" 57 58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 cursor_key_overrides: dict[str, str] | None = None, 68 primary_key_overrides: dict[str, str | list[str]] | None = None, 69 ) -> None: 70 """Initialize the source. 71 72 If config is provided, it will be validated against the spec if validate is True. 73 """ 74 self._to_be_selected_streams: list[str] | str = [] 75 """Used to hold selection criteria before catalog is known.""" 76 77 super().__init__( 78 executor=executor, 79 name=name, 80 config=config, 81 config_change_callback=config_change_callback, 82 validate=validate, 83 ) 84 self._config_dict: dict[str, Any] | None = None 85 self._last_log_messages: list[str] = [] 86 self._discovered_catalog: AirbyteCatalog | None = None 87 self._selected_stream_names: list[str] = [] 88 89 self._cursor_key_overrides: dict[str, str] = {} 90 """A mapping of lower-cased stream names to cursor key overrides.""" 91 92 self._primary_key_overrides: dict[str, list[str]] = {} 93 """A mapping of lower-cased stream names to primary key overrides.""" 94 95 if config is not None: 96 self.set_config(config, validate=validate) 97 if streams is not None: 98 self.select_streams(streams) 99 if cursor_key_overrides is not None: 100 self.set_cursor_keys(**cursor_key_overrides) 101 if primary_key_overrides is not None: 102 self.set_primary_keys(**primary_key_overrides) 103 104 def set_streams(self, streams: list[str]) -> None: 105 """Deprecated. See select_streams().""" 106 warnings.warn( 107 "The 'set_streams' method is deprecated and will be removed in a future version. " 108 "Please use the 'select_streams' method instead.", 109 DeprecationWarning, 110 stacklevel=2, 111 ) 112 self.select_streams(streams) 113 114 def set_cursor_key( 115 self, 116 stream_name: str, 117 cursor_key: str, 118 ) -> None: 119 """Set the cursor for a single stream. 120 121 Note: 122 - This does not unset previously set cursors. 123 - The cursor key must be a single field name. 124 - Not all streams support custom cursors. If a stream does not support custom cursors, 125 the override may be ignored. 126 - Stream names are case insensitive, while field names are case sensitive. 127 - Stream names are not validated by PyAirbyte. If the stream name 128 does not exist in the catalog, the override may be ignored. 129 """ 130 self._cursor_key_overrides[stream_name.lower()] = cursor_key 131 132 def set_cursor_keys( 133 self, 134 **kwargs: str, 135 ) -> None: 136 """Override the cursor key for one or more streams. 137 138 Usage: 139 source.set_cursor_keys( 140 stream1="cursor1", 141 stream2="cursor2", 142 ) 143 144 Note: 145 - This does not unset previously set cursors. 146 - The cursor key must be a single field name. 147 - Not all streams support custom cursors. If a stream does not support custom cursors, 148 the override may be ignored. 149 - Stream names are case insensitive, while field names are case sensitive. 150 - Stream names are not validated by PyAirbyte. If the stream name 151 does not exist in the catalog, the override may be ignored. 152 """ 153 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 154 155 def set_primary_key( 156 self, 157 stream_name: str, 158 primary_key: str | list[str], 159 ) -> None: 160 """Set the primary key for a single stream. 161 162 Note: 163 - This does not unset previously set primary keys. 164 - The primary key must be a single field name or a list of field names. 165 - Not all streams support overriding primary keys. If a stream does not support overriding 166 primary keys, the override may be ignored. 167 - Stream names are case insensitive, while field names are case sensitive. 168 - Stream names are not validated by PyAirbyte. If the stream name 169 does not exist in the catalog, the override may be ignored. 170 """ 171 self._primary_key_overrides[stream_name.lower()] = ( 172 primary_key if isinstance(primary_key, list) else [primary_key] 173 ) 174 175 def set_primary_keys( 176 self, 177 **kwargs: str | list[str], 178 ) -> None: 179 """Override the primary keys for one or more streams. 180 181 This does not unset previously set primary keys. 182 183 Usage: 184 source.set_primary_keys( 185 stream1="pk1", 186 stream2=["pk1", "pk2"], 187 ) 188 189 Note: 190 - This does not unset previously set primary keys. 191 - The primary key must be a single field name or a list of field names. 192 - Not all streams support overriding primary keys. If a stream does not support overriding 193 primary keys, the override may be ignored. 194 - Stream names are case insensitive, while field names are case sensitive. 195 - Stream names are not validated by PyAirbyte. If the stream name 196 does not exist in the catalog, the override may be ignored. 197 """ 198 self._primary_key_overrides.update( 199 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 200 ) 201 202 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 203 """Logs a warning message indicating stream selection which are not selected yet.""" 204 if streams == "*": 205 print( 206 "Warning: Config is not set yet. All streams will be selected after config is set." 207 ) 208 else: 209 print( 210 "Warning: Config is not set yet. " 211 f"Streams to be selected after config is set: {streams}" 212 ) 213 214 def select_all_streams(self) -> None: 215 """Select all streams. 216 217 This is a more streamlined equivalent to: 218 > source.select_streams(source.get_available_streams()). 219 """ 220 if self._config_dict is None: 221 self._to_be_selected_streams = "*" 222 self._log_warning_preselected_stream(self._to_be_selected_streams) 223 return 224 225 self._selected_stream_names = self.get_available_streams() 226 227 def select_streams(self, streams: str | list[str]) -> None: 228 """Select the stream names that should be read from the connector. 229 230 Args: 231 streams: A list of stream names to select. If set to "*", all streams will be selected. 232 233 Currently, if this is not set, all streams will be read. 234 """ 235 if self._config_dict is None: 236 self._to_be_selected_streams = streams 237 self._log_warning_preselected_stream(streams) 238 return 239 240 if streams == "*": 241 self.select_all_streams() 242 return 243 244 if isinstance(streams, str): 245 # If a single stream is provided, convert it to a one-item list 246 streams = [streams] 247 248 available_streams = self.get_available_streams() 249 for stream in streams: 250 if stream not in available_streams: 251 raise exc.AirbyteStreamNotFoundError( 252 stream_name=stream, 253 connector_name=self.name, 254 available_streams=available_streams, 255 ) 256 self._selected_stream_names = streams 257 258 def get_selected_streams(self) -> list[str]: 259 """Get the selected streams. 260 261 If no streams are selected, return an empty list. 262 """ 263 return self._selected_stream_names 264 265 def set_config( 266 self, 267 config: dict[str, Any], 268 *, 269 validate: bool = True, 270 ) -> None: 271 """Set the config for the connector. 272 273 If validate is True, raise an exception if the config fails validation. 274 275 If validate is False, validation will be deferred until check() or validate_config() 276 is called. 277 """ 278 if validate: 279 self.validate_config(config) 280 281 self._config_dict = config 282 283 if self._to_be_selected_streams: 284 self.select_streams(self._to_be_selected_streams) 285 self._to_be_selected_streams = [] 286 287 def _discover(self) -> AirbyteCatalog: 288 """Call discover on the connector. 289 290 This involves the following steps: 291 - Write the config to a temporary file 292 - execute the connector with discover --config <config_file> 293 - Listen to the messages and return the first AirbyteCatalog that comes along. 294 - Make sure the subprocess is killed when the function returns. 295 """ 296 with as_temp_files([self._hydrated_config]) as [config_file]: 297 for msg in self._execute(["discover", "--config", config_file]): 298 if msg.type == Type.CATALOG and msg.catalog: 299 return msg.catalog 300 raise exc.AirbyteConnectorMissingCatalogError( 301 connector_name=self.name, 302 log_text=self._last_log_messages, 303 ) 304 305 def get_available_streams(self) -> list[str]: 306 """Get the available streams from the spec.""" 307 return [s.name for s in self.discovered_catalog.streams] 308 309 def _get_incremental_stream_names(self) -> list[str]: 310 """Get the name of streams that support incremental sync.""" 311 return [ 312 stream.name 313 for stream in self.discovered_catalog.streams 314 if SyncMode.incremental in stream.supported_sync_modes 315 ] 316 317 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 318 """Call spec on the connector. 319 320 This involves the following steps: 321 * execute the connector with spec 322 * Listen to the messages and return the first AirbyteCatalog that comes along. 323 * Make sure the subprocess is killed when the function returns. 324 """ 325 if force_refresh or self._spec is None: 326 for msg in self._execute(["spec"]): 327 if msg.type == Type.SPEC and msg.spec: 328 self._spec = msg.spec 329 break 330 331 if self._spec: 332 return self._spec 333 334 raise exc.AirbyteConnectorMissingSpecError( 335 connector_name=self.name, 336 log_text=self._last_log_messages, 337 ) 338 339 @property 340 def config_spec(self) -> dict[str, Any]: 341 """Generate a configuration spec for this connector, as a JSON Schema definition. 342 343 This function generates a JSON Schema dictionary with configuration specs for the 344 current connector, as a dictionary. 345 346 Returns: 347 dict: The JSON Schema configuration spec as a dictionary. 348 """ 349 return self._get_spec(force_refresh=True).connectionSpecification 350 351 def print_config_spec( 352 self, 353 format: Literal["yaml", "json"] = "yaml", # noqa: A002 354 *, 355 output_file: Path | str | None = None, 356 ) -> None: 357 """Print the configuration spec for this connector. 358 359 Args: 360 format: The format to print the spec in. Must be "yaml" or "json". 361 output_file: Optional. If set, the spec will be written to the given file path. 362 Otherwise, it will be printed to the console. 363 """ 364 if format not in {"yaml", "json"}: 365 raise exc.PyAirbyteInputError( 366 message="Invalid format. Expected 'yaml' or 'json'", 367 input_value=format, 368 ) 369 if isinstance(output_file, str): 370 output_file = Path(output_file) 371 372 if format == "yaml": 373 content = yaml.dump(self.config_spec, indent=2) 374 elif format == "json": 375 content = json.dumps(self.config_spec, indent=2) 376 377 if output_file: 378 output_file.write_text(content) 379 return 380 381 syntax_highlighted = Syntax(content, format) 382 print(syntax_highlighted) 383 384 @property 385 def _yaml_spec(self) -> str: 386 """Get the spec as a yaml string. 387 388 For now, the primary use case is for writing and debugging a valid config for a source. 389 390 This is private for now because we probably want better polish before exposing this 391 as a stable interface. This will also get easier when we have docs links with this info 392 for each connector. 393 """ 394 spec_obj: ConnectorSpecification = self._get_spec() 395 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 396 # convert to a yaml string 397 return yaml.dump(spec_dict) 398 399 @property 400 def docs_url(self) -> str: 401 """Get the URL to the connector's documentation.""" 402 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 403 "source-", "" 404 ) 405 406 @property 407 def discovered_catalog(self) -> AirbyteCatalog: 408 """Get the raw catalog for the given streams. 409 410 If the catalog is not yet known, we call discover to get it. 411 """ 412 if self._discovered_catalog is None: 413 self._discovered_catalog = self._discover() 414 415 return self._discovered_catalog 416 417 @property 418 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 419 """Get the configured catalog for the given streams. 420 421 If the raw catalog is not yet known, we call discover to get it. 422 423 If no specific streams are selected, we return a catalog that syncs all available streams. 424 425 TODO: We should consider disabling by default the streams that the connector would 426 disable by default. (For instance, streams that require a premium license are sometimes 427 disabled by default within the connector.) 428 """ 429 # Ensure discovered catalog is cached before we start 430 _ = self.discovered_catalog 431 432 # Filter for selected streams if set, otherwise use all available streams: 433 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 434 return self.get_configured_catalog(streams=streams_filter) 435 436 def get_configured_catalog( 437 self, 438 streams: Literal["*"] | list[str] | None = None, 439 ) -> ConfiguredAirbyteCatalog: 440 """Get a configured catalog for the given streams. 441 442 If no streams are provided, the selected streams will be used. If no streams are selected, 443 all available streams will be used. 444 445 If '*' is provided, all available streams will be used. 446 """ 447 selected_streams: list[str] = [] 448 if streams is None: 449 selected_streams = self._selected_stream_names or self.get_available_streams() 450 elif streams == "*": 451 selected_streams = self.get_available_streams() 452 elif isinstance(streams, list): 453 selected_streams = streams 454 else: 455 raise exc.PyAirbyteInputError( 456 message="Invalid streams argument.", 457 input_value=streams, 458 ) 459 460 return ConfiguredAirbyteCatalog( 461 streams=[ 462 ConfiguredAirbyteStream( 463 stream=stream, 464 destination_sync_mode=DestinationSyncMode.overwrite, 465 sync_mode=SyncMode.incremental, 466 primary_key=( 467 [self._primary_key_overrides[stream.name.lower()]] 468 if stream.name.lower() in self._primary_key_overrides 469 else stream.source_defined_primary_key 470 ), 471 cursor_field=( 472 [self._cursor_key_overrides[stream.name.lower()]] 473 if stream.name.lower() in self._cursor_key_overrides 474 else stream.default_cursor_field 475 ), 476 # These are unused in the current implementation: 477 generation_id=None, 478 minimum_generation_id=None, 479 sync_id=None, 480 ) 481 for stream in self.discovered_catalog.streams 482 if stream.name in selected_streams 483 ], 484 ) 485 486 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 487 """Return the JSON Schema spec for the specified stream name.""" 488 catalog: AirbyteCatalog = self.discovered_catalog 489 found: list[AirbyteStream] = [ 490 stream for stream in catalog.streams if stream.name == stream_name 491 ] 492 493 if len(found) == 0: 494 raise exc.PyAirbyteInputError( 495 message="Stream name does not exist in catalog.", 496 input_value=stream_name, 497 ) 498 499 if len(found) > 1: 500 raise exc.PyAirbyteInternalError( 501 message="Duplicate streams found with the same name.", 502 context={ 503 "found_streams": found, 504 }, 505 ) 506 507 return found[0].json_schema 508 509 def get_records( 510 self, 511 stream: str, 512 *, 513 normalize_field_names: bool = False, 514 prune_undeclared_fields: bool = True, 515 ) -> LazyDataset: 516 """Read a stream from the connector. 517 518 Args: 519 stream: The name of the stream to read. 520 normalize_field_names: When `True`, field names will be normalized to lower case, with 521 special characters removed. This matches the behavior of PyAirbyte caches and most 522 Airbyte destinations. 523 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 524 which generally matches the behavior of PyAirbyte caches and most Airbyte 525 destinations, specifically when you expect the catalog may be stale. You can disable 526 this to keep all fields in the records. 527 528 This involves the following steps: 529 * Call discover to get the catalog 530 * Generate a configured catalog that syncs the given stream in full_refresh mode 531 * Write the configured catalog and the config to a temporary file 532 * execute the connector with read --config <config_file> --catalog <catalog_file> 533 * Listen to the messages and return the first AirbyteRecordMessages that come along. 534 * Make sure the subprocess is killed when the function returns. 535 """ 536 configured_catalog = self.get_configured_catalog(streams=[stream]) 537 if len(configured_catalog.streams) == 0: 538 raise exc.PyAirbyteInputError( 539 message="Requested stream does not exist.", 540 context={ 541 "stream": stream, 542 "available_streams": self.get_available_streams(), 543 "connector_name": self.name, 544 }, 545 ) from KeyError(stream) 546 547 configured_stream = configured_catalog.streams[0] 548 549 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 550 yield from records 551 552 stream_record_handler = StreamRecordHandler( 553 json_schema=self.get_stream_json_schema(stream), 554 prune_extra_fields=prune_undeclared_fields, 555 normalize_keys=normalize_field_names, 556 ) 557 558 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 559 progress_tracker = ProgressTracker( 560 ProgressStyle.PLAIN, 561 source=self, 562 cache=None, 563 destination=None, 564 expected_streams=[stream], 565 ) 566 567 iterator: Iterator[dict[str, Any]] = ( 568 StreamRecord.from_record_message( 569 record_message=record.record, 570 stream_record_handler=stream_record_handler, 571 ) 572 for record in self._read_with_catalog( 573 catalog=configured_catalog, 574 progress_tracker=progress_tracker, 575 ) 576 if record.record 577 ) 578 progress_tracker.log_success() 579 return LazyDataset( 580 iterator, 581 stream_metadata=configured_stream, 582 ) 583 584 def get_documents( 585 self, 586 stream: str, 587 title_property: str | None = None, 588 content_properties: list[str] | None = None, 589 metadata_properties: list[str] | None = None, 590 *, 591 render_metadata: bool = False, 592 ) -> Iterable[Document]: 593 """Read a stream from the connector and return the records as documents. 594 595 If metadata_properties is not set, all properties that are not content will be added to 596 the metadata. 597 598 If render_metadata is True, metadata will be rendered in the document, as well as the 599 the main content. 600 """ 601 return self.get_records(stream).to_documents( 602 title_property=title_property, 603 content_properties=content_properties, 604 metadata_properties=metadata_properties, 605 render_metadata=render_metadata, 606 ) 607 608 def _get_airbyte_message_iterator( 609 self, 610 *, 611 streams: Literal["*"] | list[str] | None = None, 612 state_provider: StateProviderBase | None = None, 613 progress_tracker: ProgressTracker, 614 force_full_refresh: bool = False, 615 ) -> AirbyteMessageIterator: 616 """Get an AirbyteMessageIterator for this source.""" 617 return AirbyteMessageIterator( 618 self._read_with_catalog( 619 catalog=self.get_configured_catalog(streams=streams), 620 state=state_provider if not force_full_refresh else None, 621 progress_tracker=progress_tracker, 622 ) 623 ) 624 625 def _read_with_catalog( 626 self, 627 catalog: ConfiguredAirbyteCatalog, 628 progress_tracker: ProgressTracker, 629 state: StateProviderBase | None = None, 630 ) -> Generator[AirbyteMessage, None, None]: 631 """Call read on the connector. 632 633 This involves the following steps: 634 * Write the config to a temporary file 635 * execute the connector with read --config <config_file> --catalog <catalog_file> 636 * Listen to the messages and return the AirbyteRecordMessages that come along. 637 * Send out telemetry on the performed sync (with information about which source was used and 638 the type of the cache) 639 """ 640 with as_temp_files( 641 [ 642 self._hydrated_config, 643 catalog.model_dump_json(), 644 state.to_state_input_file_text() if state else "[]", 645 ] 646 ) as [ 647 config_file, 648 catalog_file, 649 state_file, 650 ]: 651 message_generator = self._execute( 652 [ 653 "read", 654 "--config", 655 config_file, 656 "--catalog", 657 catalog_file, 658 "--state", 659 state_file, 660 ], 661 progress_tracker=progress_tracker, 662 ) 663 yield from progress_tracker.tally_records_read(message_generator) 664 progress_tracker.log_read_complete() 665 666 def _peek_airbyte_message( 667 self, 668 message: AirbyteMessage, 669 *, 670 raise_on_error: bool = True, 671 ) -> None: 672 """Process an Airbyte message. 673 674 This method handles reading Airbyte messages and taking action, if needed, based on the 675 message type. For instance, log messages are logged, records are tallied, and errors are 676 raised as exceptions if `raise_on_error` is True. 677 678 Raises: 679 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 680 """ 681 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 682 683 def _log_incremental_streams( 684 self, 685 *, 686 incremental_streams: set[str] | None = None, 687 ) -> None: 688 """Log the streams which are using incremental sync mode.""" 689 log_message = ( 690 "The following streams are currently using incremental sync:\n" 691 f"{incremental_streams}\n" 692 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 693 ) 694 print(log_message) 695 696 def read( 697 self, 698 cache: CacheBase | None = None, 699 *, 700 streams: str | list[str] | None = None, 701 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 702 force_full_refresh: bool = False, 703 skip_validation: bool = False, 704 ) -> ReadResult: 705 """Read from the connector and write to the cache. 706 707 Args: 708 cache: The cache to write to. If not set, a default cache will be used. 709 streams: Optional if already set. A list of stream names to select for reading. If set 710 to "*", all streams will be selected. 711 write_strategy: The strategy to use when writing to the cache. If a string, it must be 712 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 713 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 714 WriteStrategy.AUTO. 715 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 716 streams will be read in incremental mode if supported by the connector. This option 717 must be True when using the "replace" strategy. 718 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 719 running the connector. This can be helpful in debugging, when you want to send 720 configurations to the connector that otherwise might be rejected by JSON Schema 721 validation rules. 722 """ 723 cache = cache or get_default_cache() 724 progress_tracker = ProgressTracker( 725 source=self, 726 cache=cache, 727 destination=None, 728 expected_streams=None, # Will be set later 729 ) 730 731 # Set up state provider if not in full refresh mode 732 if force_full_refresh: 733 state_provider: StateProviderBase | None = None 734 else: 735 state_provider = cache.get_state_provider( 736 source_name=self._name, 737 ) 738 state_writer = cache.get_state_writer(source_name=self._name) 739 740 if streams: 741 self.select_streams(streams) 742 743 if not self._selected_stream_names: 744 raise exc.PyAirbyteNoStreamsSelectedError( 745 connector_name=self.name, 746 available_streams=self.get_available_streams(), 747 ) 748 749 try: 750 result = self._read_to_cache( 751 cache=cache, 752 catalog_provider=CatalogProvider(self.configured_catalog), 753 stream_names=self._selected_stream_names, 754 state_provider=state_provider, 755 state_writer=state_writer, 756 write_strategy=write_strategy, 757 force_full_refresh=force_full_refresh, 758 skip_validation=skip_validation, 759 progress_tracker=progress_tracker, 760 ) 761 except exc.PyAirbyteInternalError as ex: 762 progress_tracker.log_failure(exception=ex) 763 raise exc.AirbyteConnectorFailedError( 764 connector_name=self.name, 765 log_text=self._last_log_messages, 766 ) from ex 767 except Exception as ex: 768 progress_tracker.log_failure(exception=ex) 769 raise 770 771 progress_tracker.log_success() 772 return result 773 774 def _read_to_cache( # noqa: PLR0913 # Too many arguments 775 self, 776 cache: CacheBase, 777 *, 778 catalog_provider: CatalogProvider, 779 stream_names: list[str], 780 state_provider: StateProviderBase | None, 781 state_writer: StateWriterBase | None, 782 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 783 force_full_refresh: bool = False, 784 skip_validation: bool = False, 785 progress_tracker: ProgressTracker, 786 ) -> ReadResult: 787 """Internal read method.""" 788 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 789 warnings.warn( 790 message=( 791 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 792 "could result in data loss. " 793 "To silence this warning, use the following: " 794 'warnings.filterwarnings("ignore", ' 795 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 796 ), 797 category=exc.PyAirbyteDataLossWarning, 798 stacklevel=1, 799 ) 800 if isinstance(write_strategy, str): 801 try: 802 write_strategy = WriteStrategy(write_strategy) 803 except ValueError: 804 raise exc.PyAirbyteInputError( 805 message="Invalid strategy", 806 context={ 807 "write_strategy": write_strategy, 808 "available_strategies": [s.value for s in WriteStrategy], 809 }, 810 ) from None 811 812 # Run optional validation step 813 if not skip_validation: 814 self.validate_config() 815 816 # Log incremental stream if incremental streams are known 817 if state_provider and state_provider.known_stream_names: 818 # Retrieve set of the known streams support which support incremental sync 819 incremental_streams = ( 820 set(self._get_incremental_stream_names()) 821 & state_provider.known_stream_names 822 & set(self.get_selected_streams()) 823 ) 824 if incremental_streams: 825 self._log_incremental_streams(incremental_streams=incremental_streams) 826 827 airbyte_message_iterator = AirbyteMessageIterator( 828 self._read_with_catalog( 829 catalog=catalog_provider.configured_catalog, 830 state=state_provider, 831 progress_tracker=progress_tracker, 832 ) 833 ) 834 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 835 stdin=airbyte_message_iterator, 836 catalog_provider=catalog_provider, 837 write_strategy=write_strategy, 838 state_writer=state_writer, 839 progress_tracker=progress_tracker, 840 ) 841 842 # Flush the WAL, if applicable 843 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 844 845 return ReadResult( 846 source_name=self.name, 847 progress_tracker=progress_tracker, 848 processed_streams=stream_names, 849 cache=cache, 850 )
A class representing a source that can be called.
58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 cursor_key_overrides: dict[str, str] | None = None, 68 primary_key_overrides: dict[str, str | list[str]] | None = None, 69 ) -> None: 70 """Initialize the source. 71 72 If config is provided, it will be validated against the spec if validate is True. 73 """ 74 self._to_be_selected_streams: list[str] | str = [] 75 """Used to hold selection criteria before catalog is known.""" 76 77 super().__init__( 78 executor=executor, 79 name=name, 80 config=config, 81 config_change_callback=config_change_callback, 82 validate=validate, 83 ) 84 self._config_dict: dict[str, Any] | None = None 85 self._last_log_messages: list[str] = [] 86 self._discovered_catalog: AirbyteCatalog | None = None 87 self._selected_stream_names: list[str] = [] 88 89 self._cursor_key_overrides: dict[str, str] = {} 90 """A mapping of lower-cased stream names to cursor key overrides.""" 91 92 self._primary_key_overrides: dict[str, list[str]] = {} 93 """A mapping of lower-cased stream names to primary key overrides.""" 94 95 if config is not None: 96 self.set_config(config, validate=validate) 97 if streams is not None: 98 self.select_streams(streams) 99 if cursor_key_overrides is not None: 100 self.set_cursor_keys(**cursor_key_overrides) 101 if primary_key_overrides is not None: 102 self.set_primary_keys(**primary_key_overrides)
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
104 def set_streams(self, streams: list[str]) -> None: 105 """Deprecated. See select_streams().""" 106 warnings.warn( 107 "The 'set_streams' method is deprecated and will be removed in a future version. " 108 "Please use the 'select_streams' method instead.", 109 DeprecationWarning, 110 stacklevel=2, 111 ) 112 self.select_streams(streams)
Deprecated. See select_streams().
114 def set_cursor_key( 115 self, 116 stream_name: str, 117 cursor_key: str, 118 ) -> None: 119 """Set the cursor for a single stream. 120 121 Note: 122 - This does not unset previously set cursors. 123 - The cursor key must be a single field name. 124 - Not all streams support custom cursors. If a stream does not support custom cursors, 125 the override may be ignored. 126 - Stream names are case insensitive, while field names are case sensitive. 127 - Stream names are not validated by PyAirbyte. If the stream name 128 does not exist in the catalog, the override may be ignored. 129 """ 130 self._cursor_key_overrides[stream_name.lower()] = cursor_key
Set the cursor for a single stream.
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
132 def set_cursor_keys( 133 self, 134 **kwargs: str, 135 ) -> None: 136 """Override the cursor key for one or more streams. 137 138 Usage: 139 source.set_cursor_keys( 140 stream1="cursor1", 141 stream2="cursor2", 142 ) 143 144 Note: 145 - This does not unset previously set cursors. 146 - The cursor key must be a single field name. 147 - Not all streams support custom cursors. If a stream does not support custom cursors, 148 the override may be ignored. 149 - Stream names are case insensitive, while field names are case sensitive. 150 - Stream names are not validated by PyAirbyte. If the stream name 151 does not exist in the catalog, the override may be ignored. 152 """ 153 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
Override the cursor key for one or more streams.
Usage:
source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
155 def set_primary_key( 156 self, 157 stream_name: str, 158 primary_key: str | list[str], 159 ) -> None: 160 """Set the primary key for a single stream. 161 162 Note: 163 - This does not unset previously set primary keys. 164 - The primary key must be a single field name or a list of field names. 165 - Not all streams support overriding primary keys. If a stream does not support overriding 166 primary keys, the override may be ignored. 167 - Stream names are case insensitive, while field names are case sensitive. 168 - Stream names are not validated by PyAirbyte. If the stream name 169 does not exist in the catalog, the override may be ignored. 170 """ 171 self._primary_key_overrides[stream_name.lower()] = ( 172 primary_key if isinstance(primary_key, list) else [primary_key] 173 )
Set the primary key for a single stream.
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
175 def set_primary_keys( 176 self, 177 **kwargs: str | list[str], 178 ) -> None: 179 """Override the primary keys for one or more streams. 180 181 This does not unset previously set primary keys. 182 183 Usage: 184 source.set_primary_keys( 185 stream1="pk1", 186 stream2=["pk1", "pk2"], 187 ) 188 189 Note: 190 - This does not unset previously set primary keys. 191 - The primary key must be a single field name or a list of field names. 192 - Not all streams support overriding primary keys. If a stream does not support overriding 193 primary keys, the override may be ignored. 194 - Stream names are case insensitive, while field names are case sensitive. 195 - Stream names are not validated by PyAirbyte. If the stream name 196 does not exist in the catalog, the override may be ignored. 197 """ 198 self._primary_key_overrides.update( 199 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 200 )
Override the primary keys for one or more streams.
This does not unset previously set primary keys.
Usage:
source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
214 def select_all_streams(self) -> None: 215 """Select all streams. 216 217 This is a more streamlined equivalent to: 218 > source.select_streams(source.get_available_streams()). 219 """ 220 if self._config_dict is None: 221 self._to_be_selected_streams = "*" 222 self._log_warning_preselected_stream(self._to_be_selected_streams) 223 return 224 225 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
227 def select_streams(self, streams: str | list[str]) -> None: 228 """Select the stream names that should be read from the connector. 229 230 Args: 231 streams: A list of stream names to select. If set to "*", all streams will be selected. 232 233 Currently, if this is not set, all streams will be read. 234 """ 235 if self._config_dict is None: 236 self._to_be_selected_streams = streams 237 self._log_warning_preselected_stream(streams) 238 return 239 240 if streams == "*": 241 self.select_all_streams() 242 return 243 244 if isinstance(streams, str): 245 # If a single stream is provided, convert it to a one-item list 246 streams = [streams] 247 248 available_streams = self.get_available_streams() 249 for stream in streams: 250 if stream not in available_streams: 251 raise exc.AirbyteStreamNotFoundError( 252 stream_name=stream, 253 connector_name=self.name, 254 available_streams=available_streams, 255 ) 256 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
258 def get_selected_streams(self) -> list[str]: 259 """Get the selected streams. 260 261 If no streams are selected, return an empty list. 262 """ 263 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
265 def set_config( 266 self, 267 config: dict[str, Any], 268 *, 269 validate: bool = True, 270 ) -> None: 271 """Set the config for the connector. 272 273 If validate is True, raise an exception if the config fails validation. 274 275 If validate is False, validation will be deferred until check() or validate_config() 276 is called. 277 """ 278 if validate: 279 self.validate_config(config) 280 281 self._config_dict = config 282 283 if self._to_be_selected_streams: 284 self.select_streams(self._to_be_selected_streams) 285 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
305 def get_available_streams(self) -> list[str]: 306 """Get the available streams from the spec.""" 307 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
339 @property 340 def config_spec(self) -> dict[str, Any]: 341 """Generate a configuration spec for this connector, as a JSON Schema definition. 342 343 This function generates a JSON Schema dictionary with configuration specs for the 344 current connector, as a dictionary. 345 346 Returns: 347 dict: The JSON Schema configuration spec as a dictionary. 348 """ 349 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
351 def print_config_spec( 352 self, 353 format: Literal["yaml", "json"] = "yaml", # noqa: A002 354 *, 355 output_file: Path | str | None = None, 356 ) -> None: 357 """Print the configuration spec for this connector. 358 359 Args: 360 format: The format to print the spec in. Must be "yaml" or "json". 361 output_file: Optional. If set, the spec will be written to the given file path. 362 Otherwise, it will be printed to the console. 363 """ 364 if format not in {"yaml", "json"}: 365 raise exc.PyAirbyteInputError( 366 message="Invalid format. Expected 'yaml' or 'json'", 367 input_value=format, 368 ) 369 if isinstance(output_file, str): 370 output_file = Path(output_file) 371 372 if format == "yaml": 373 content = yaml.dump(self.config_spec, indent=2) 374 elif format == "json": 375 content = json.dumps(self.config_spec, indent=2) 376 377 if output_file: 378 output_file.write_text(content) 379 return 380 381 syntax_highlighted = Syntax(content, format) 382 print(syntax_highlighted)
Print the configuration spec for this connector.
Arguments:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
399 @property 400 def docs_url(self) -> str: 401 """Get the URL to the connector's documentation.""" 402 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 403 "source-", "" 404 )
Get the URL to the connector's documentation.
406 @property 407 def discovered_catalog(self) -> AirbyteCatalog: 408 """Get the raw catalog for the given streams. 409 410 If the catalog is not yet known, we call discover to get it. 411 """ 412 if self._discovered_catalog is None: 413 self._discovered_catalog = self._discover() 414 415 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
417 @property 418 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 419 """Get the configured catalog for the given streams. 420 421 If the raw catalog is not yet known, we call discover to get it. 422 423 If no specific streams are selected, we return a catalog that syncs all available streams. 424 425 TODO: We should consider disabling by default the streams that the connector would 426 disable by default. (For instance, streams that require a premium license are sometimes 427 disabled by default within the connector.) 428 """ 429 # Ensure discovered catalog is cached before we start 430 _ = self.discovered_catalog 431 432 # Filter for selected streams if set, otherwise use all available streams: 433 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 434 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
436 def get_configured_catalog( 437 self, 438 streams: Literal["*"] | list[str] | None = None, 439 ) -> ConfiguredAirbyteCatalog: 440 """Get a configured catalog for the given streams. 441 442 If no streams are provided, the selected streams will be used. If no streams are selected, 443 all available streams will be used. 444 445 If '*' is provided, all available streams will be used. 446 """ 447 selected_streams: list[str] = [] 448 if streams is None: 449 selected_streams = self._selected_stream_names or self.get_available_streams() 450 elif streams == "*": 451 selected_streams = self.get_available_streams() 452 elif isinstance(streams, list): 453 selected_streams = streams 454 else: 455 raise exc.PyAirbyteInputError( 456 message="Invalid streams argument.", 457 input_value=streams, 458 ) 459 460 return ConfiguredAirbyteCatalog( 461 streams=[ 462 ConfiguredAirbyteStream( 463 stream=stream, 464 destination_sync_mode=DestinationSyncMode.overwrite, 465 sync_mode=SyncMode.incremental, 466 primary_key=( 467 [self._primary_key_overrides[stream.name.lower()]] 468 if stream.name.lower() in self._primary_key_overrides 469 else stream.source_defined_primary_key 470 ), 471 cursor_field=( 472 [self._cursor_key_overrides[stream.name.lower()]] 473 if stream.name.lower() in self._cursor_key_overrides 474 else stream.default_cursor_field 475 ), 476 # These are unused in the current implementation: 477 generation_id=None, 478 minimum_generation_id=None, 479 sync_id=None, 480 ) 481 for stream in self.discovered_catalog.streams 482 if stream.name in selected_streams 483 ], 484 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
486 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 487 """Return the JSON Schema spec for the specified stream name.""" 488 catalog: AirbyteCatalog = self.discovered_catalog 489 found: list[AirbyteStream] = [ 490 stream for stream in catalog.streams if stream.name == stream_name 491 ] 492 493 if len(found) == 0: 494 raise exc.PyAirbyteInputError( 495 message="Stream name does not exist in catalog.", 496 input_value=stream_name, 497 ) 498 499 if len(found) > 1: 500 raise exc.PyAirbyteInternalError( 501 message="Duplicate streams found with the same name.", 502 context={ 503 "found_streams": found, 504 }, 505 ) 506 507 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
509 def get_records( 510 self, 511 stream: str, 512 *, 513 normalize_field_names: bool = False, 514 prune_undeclared_fields: bool = True, 515 ) -> LazyDataset: 516 """Read a stream from the connector. 517 518 Args: 519 stream: The name of the stream to read. 520 normalize_field_names: When `True`, field names will be normalized to lower case, with 521 special characters removed. This matches the behavior of PyAirbyte caches and most 522 Airbyte destinations. 523 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 524 which generally matches the behavior of PyAirbyte caches and most Airbyte 525 destinations, specifically when you expect the catalog may be stale. You can disable 526 this to keep all fields in the records. 527 528 This involves the following steps: 529 * Call discover to get the catalog 530 * Generate a configured catalog that syncs the given stream in full_refresh mode 531 * Write the configured catalog and the config to a temporary file 532 * execute the connector with read --config <config_file> --catalog <catalog_file> 533 * Listen to the messages and return the first AirbyteRecordMessages that come along. 534 * Make sure the subprocess is killed when the function returns. 535 """ 536 configured_catalog = self.get_configured_catalog(streams=[stream]) 537 if len(configured_catalog.streams) == 0: 538 raise exc.PyAirbyteInputError( 539 message="Requested stream does not exist.", 540 context={ 541 "stream": stream, 542 "available_streams": self.get_available_streams(), 543 "connector_name": self.name, 544 }, 545 ) from KeyError(stream) 546 547 configured_stream = configured_catalog.streams[0] 548 549 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 550 yield from records 551 552 stream_record_handler = StreamRecordHandler( 553 json_schema=self.get_stream_json_schema(stream), 554 prune_extra_fields=prune_undeclared_fields, 555 normalize_keys=normalize_field_names, 556 ) 557 558 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 559 progress_tracker = ProgressTracker( 560 ProgressStyle.PLAIN, 561 source=self, 562 cache=None, 563 destination=None, 564 expected_streams=[stream], 565 ) 566 567 iterator: Iterator[dict[str, Any]] = ( 568 StreamRecord.from_record_message( 569 record_message=record.record, 570 stream_record_handler=stream_record_handler, 571 ) 572 for record in self._read_with_catalog( 573 catalog=configured_catalog, 574 progress_tracker=progress_tracker, 575 ) 576 if record.record 577 ) 578 progress_tracker.log_success() 579 return LazyDataset( 580 iterator, 581 stream_metadata=configured_stream, 582 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- normalize_field_names: When
True
, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True
, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
584 def get_documents( 585 self, 586 stream: str, 587 title_property: str | None = None, 588 content_properties: list[str] | None = None, 589 metadata_properties: list[str] | None = None, 590 *, 591 render_metadata: bool = False, 592 ) -> Iterable[Document]: 593 """Read a stream from the connector and return the records as documents. 594 595 If metadata_properties is not set, all properties that are not content will be added to 596 the metadata. 597 598 If render_metadata is True, metadata will be rendered in the document, as well as the 599 the main content. 600 """ 601 return self.get_records(stream).to_documents( 602 title_property=title_property, 603 content_properties=content_properties, 604 metadata_properties=metadata_properties, 605 render_metadata=render_metadata, 606 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
696 def read( 697 self, 698 cache: CacheBase | None = None, 699 *, 700 streams: str | list[str] | None = None, 701 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 702 force_full_refresh: bool = False, 703 skip_validation: bool = False, 704 ) -> ReadResult: 705 """Read from the connector and write to the cache. 706 707 Args: 708 cache: The cache to write to. If not set, a default cache will be used. 709 streams: Optional if already set. A list of stream names to select for reading. If set 710 to "*", all streams will be selected. 711 write_strategy: The strategy to use when writing to the cache. If a string, it must be 712 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 713 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 714 WriteStrategy.AUTO. 715 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 716 streams will be read in incremental mode if supported by the connector. This option 717 must be True when using the "replace" strategy. 718 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 719 running the connector. This can be helpful in debugging, when you want to send 720 configurations to the connector that otherwise might be rejected by JSON Schema 721 validation rules. 722 """ 723 cache = cache or get_default_cache() 724 progress_tracker = ProgressTracker( 725 source=self, 726 cache=cache, 727 destination=None, 728 expected_streams=None, # Will be set later 729 ) 730 731 # Set up state provider if not in full refresh mode 732 if force_full_refresh: 733 state_provider: StateProviderBase | None = None 734 else: 735 state_provider = cache.get_state_provider( 736 source_name=self._name, 737 ) 738 state_writer = cache.get_state_writer(source_name=self._name) 739 740 if streams: 741 self.select_streams(streams) 742 743 if not self._selected_stream_names: 744 raise exc.PyAirbyteNoStreamsSelectedError( 745 connector_name=self.name, 746 available_streams=self.get_available_streams(), 747 ) 748 749 try: 750 result = self._read_to_cache( 751 cache=cache, 752 catalog_provider=CatalogProvider(self.configured_catalog), 753 stream_names=self._selected_stream_names, 754 state_provider=state_provider, 755 state_writer=state_writer, 756 write_strategy=write_strategy, 757 force_full_refresh=force_full_refresh, 758 skip_validation=skip_validation, 759 progress_tracker=progress_tracker, 760 ) 761 except exc.PyAirbyteInternalError as ex: 762 progress_tracker.log_failure(exception=ex) 763 raise exc.AirbyteConnectorFailedError( 764 connector_name=self.name, 765 log_text=self._last_log_messages, 766 ) from ex 767 except Exception as ex: 768 progress_tracker.log_failure(exception=ex) 769 raise 770 771 progress_tracker.log_success() 772 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- get_config
- config_hash
- validate_config
- connector_version
- check
- install
- uninstall
73class ConnectorMetadata(BaseModel): 74 """Metadata for a connector.""" 75 76 name: str 77 """Connector name. For example, "source-google-sheets".""" 78 79 latest_available_version: str | None 80 """The latest available version of the connector.""" 81 82 pypi_package_name: str | None 83 """The name of the PyPI package for the connector, if it exists.""" 84 85 language: Language | None 86 """The language of the connector.""" 87 88 install_types: set[InstallType] 89 """The supported install types for the connector.""" 90 91 suggested_streams: list[str] | None = None 92 """A list of suggested streams for the connector, if available.""" 93 94 @property 95 def default_install_type(self) -> InstallType: 96 """Return the default install type for the connector.""" 97 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 98 return InstallType.YAML 99 100 if InstallType.PYTHON in self.install_types: 101 return InstallType.PYTHON 102 103 # Else: Java or Docker 104 return InstallType.DOCKER
Metadata for a connector.
The supported install types for the connector.
94 @property 95 def default_install_type(self) -> InstallType: 96 """Return the default install type for the connector.""" 97 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 98 return InstallType.YAML 99 100 if InstallType.PYTHON in self.install_types: 101 return InstallType.PYTHON 102 103 # Else: Java or Docker 104 return InstallType.DOCKER
Return the default install type for the connector.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields