airbyte.sources.base
Base class implementation for sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base class implementation for sources.""" 3 4from __future__ import annotations 5 6import json 7import warnings 8from pathlib import Path 9from typing import TYPE_CHECKING, Any, Literal 10 11import yaml 12from rich import print # noqa: A004 # Allow shadowing the built-in 13from rich.syntax import Syntax 14 15from airbyte_protocol.models import ( 16 AirbyteCatalog, 17 AirbyteMessage, 18 ConfiguredAirbyteCatalog, 19 ConfiguredAirbyteStream, 20 DestinationSyncMode, 21 SyncMode, 22 Type, 23) 24 25from airbyte import exceptions as exc 26from airbyte._connector_base import ConnectorBase 27from airbyte._message_iterators import AirbyteMessageIterator 28from airbyte._util.temp_files import as_temp_files 29from airbyte.caches.util import get_default_cache 30from airbyte.datasets._lazy import LazyDataset 31from airbyte.progress import ProgressStyle, ProgressTracker 32from airbyte.records import StreamRecord, StreamRecordHandler 33from airbyte.results import ReadResult 34from airbyte.shared.catalog_providers import CatalogProvider 35from airbyte.strategies import WriteStrategy 36 37 38if TYPE_CHECKING: 39 from collections.abc import Generator, Iterable, Iterator 40 41 from airbyte_cdk import ConnectorSpecification 42 from airbyte_protocol.models import AirbyteStream 43 44 from airbyte._executors.base import Executor 45 from airbyte.caches import CacheBase 46 from airbyte.callbacks import ConfigChangeCallback 47 from airbyte.documents import Document 48 from airbyte.shared.state_providers import StateProviderBase 49 from airbyte.shared.state_writers import StateWriterBase 50 51 52class Source(ConnectorBase): 53 """A class representing a source that can be called.""" 54 55 connector_type: Literal["source"] = "source" 56 57 def __init__( 58 self, 59 executor: Executor, 60 name: str, 61 config: dict[str, Any] | None = None, 62 *, 63 config_change_callback: ConfigChangeCallback | None = None, 64 streams: str | list[str] | None = None, 65 validate: bool = False, 66 ) -> None: 67 """Initialize the source. 68 69 If config is provided, it will be validated against the spec if validate is True. 70 """ 71 self._to_be_selected_streams: list[str] | str = [] 72 """Used to hold selection criteria before catalog is known.""" 73 74 super().__init__( 75 executor=executor, 76 name=name, 77 config=config, 78 config_change_callback=config_change_callback, 79 validate=validate, 80 ) 81 self._config_dict: dict[str, Any] | None = None 82 self._last_log_messages: list[str] = [] 83 self._discovered_catalog: AirbyteCatalog | None = None 84 self._selected_stream_names: list[str] = [] 85 if config is not None: 86 self.set_config(config, validate=validate) 87 if streams is not None: 88 self.select_streams(streams) 89 90 self._deployed_api_root: str | None = None 91 self._deployed_workspace_id: str | None = None 92 self._deployed_source_id: str | None = None 93 94 def set_streams(self, streams: list[str]) -> None: 95 """Deprecated. See select_streams().""" 96 warnings.warn( 97 "The 'set_streams' method is deprecated and will be removed in a future version. " 98 "Please use the 'select_streams' method instead.", 99 DeprecationWarning, 100 stacklevel=2, 101 ) 102 self.select_streams(streams) 103 104 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 105 """Logs a warning message indicating stream selection which are not selected yet.""" 106 if streams == "*": 107 print( 108 "Warning: Config is not set yet. All streams will be selected after config is set." 109 ) 110 else: 111 print( 112 "Warning: Config is not set yet. " 113 f"Streams to be selected after config is set: {streams}" 114 ) 115 116 def select_all_streams(self) -> None: 117 """Select all streams. 118 119 This is a more streamlined equivalent to: 120 > source.select_streams(source.get_available_streams()). 121 """ 122 if self._config_dict is None: 123 self._to_be_selected_streams = "*" 124 self._log_warning_preselected_stream(self._to_be_selected_streams) 125 return 126 127 self._selected_stream_names = self.get_available_streams() 128 129 def select_streams(self, streams: str | list[str]) -> None: 130 """Select the stream names that should be read from the connector. 131 132 Args: 133 streams: A list of stream names to select. If set to "*", all streams will be selected. 134 135 Currently, if this is not set, all streams will be read. 136 """ 137 if self._config_dict is None: 138 self._to_be_selected_streams = streams 139 self._log_warning_preselected_stream(streams) 140 return 141 142 if streams == "*": 143 self.select_all_streams() 144 return 145 146 if isinstance(streams, str): 147 # If a single stream is provided, convert it to a one-item list 148 streams = [streams] 149 150 available_streams = self.get_available_streams() 151 for stream in streams: 152 if stream not in available_streams: 153 raise exc.AirbyteStreamNotFoundError( 154 stream_name=stream, 155 connector_name=self.name, 156 available_streams=available_streams, 157 ) 158 self._selected_stream_names = streams 159 160 def get_selected_streams(self) -> list[str]: 161 """Get the selected streams. 162 163 If no streams are selected, return an empty list. 164 """ 165 return self._selected_stream_names 166 167 def set_config( 168 self, 169 config: dict[str, Any], 170 *, 171 validate: bool = True, 172 ) -> None: 173 """Set the config for the connector. 174 175 If validate is True, raise an exception if the config fails validation. 176 177 If validate is False, validation will be deferred until check() or validate_config() 178 is called. 179 """ 180 if validate: 181 self.validate_config(config) 182 183 self._config_dict = config 184 185 if self._to_be_selected_streams: 186 self.select_streams(self._to_be_selected_streams) 187 self._to_be_selected_streams = [] 188 189 def get_config(self) -> dict[str, Any]: 190 """Get the config for the connector.""" 191 return self._config 192 193 @property 194 def _config(self) -> dict[str, Any]: 195 if self._config_dict is None: 196 raise exc.AirbyteConnectorConfigurationMissingError( 197 connector_name=self.name, 198 guidance="Provide via get_source() or set_config()", 199 ) 200 return self._config_dict 201 202 def _discover(self) -> AirbyteCatalog: 203 """Call discover on the connector. 204 205 This involves the following steps: 206 - Write the config to a temporary file 207 - execute the connector with discover --config <config_file> 208 - Listen to the messages and return the first AirbyteCatalog that comes along. 209 - Make sure the subprocess is killed when the function returns. 210 """ 211 with as_temp_files([self._config]) as [config_file]: 212 for msg in self._execute(["discover", "--config", config_file]): 213 if msg.type == Type.CATALOG and msg.catalog: 214 return msg.catalog 215 raise exc.AirbyteConnectorMissingCatalogError( 216 connector_name=self.name, 217 log_text=self._last_log_messages, 218 ) 219 220 def get_available_streams(self) -> list[str]: 221 """Get the available streams from the spec.""" 222 return [s.name for s in self.discovered_catalog.streams] 223 224 def _get_incremental_stream_names(self) -> list[str]: 225 """Get the name of streams that support incremental sync.""" 226 return [ 227 stream.name 228 for stream in self.discovered_catalog.streams 229 if SyncMode.incremental in stream.supported_sync_modes 230 ] 231 232 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 233 """Call spec on the connector. 234 235 This involves the following steps: 236 * execute the connector with spec 237 * Listen to the messages and return the first AirbyteCatalog that comes along. 238 * Make sure the subprocess is killed when the function returns. 239 """ 240 if force_refresh or self._spec is None: 241 for msg in self._execute(["spec"]): 242 if msg.type == Type.SPEC and msg.spec: 243 self._spec = msg.spec 244 break 245 246 if self._spec: 247 return self._spec 248 249 raise exc.AirbyteConnectorMissingSpecError( 250 connector_name=self.name, 251 log_text=self._last_log_messages, 252 ) 253 254 @property 255 def config_spec(self) -> dict[str, Any]: 256 """Generate a configuration spec for this connector, as a JSON Schema definition. 257 258 This function generates a JSON Schema dictionary with configuration specs for the 259 current connector, as a dictionary. 260 261 Returns: 262 dict: The JSON Schema configuration spec as a dictionary. 263 """ 264 return self._get_spec(force_refresh=True).connectionSpecification 265 266 def print_config_spec( 267 self, 268 format: Literal["yaml", "json"] = "yaml", # noqa: A002 269 *, 270 output_file: Path | str | None = None, 271 ) -> None: 272 """Print the configuration spec for this connector. 273 274 Args: 275 format: The format to print the spec in. Must be "yaml" or "json". 276 output_file: Optional. If set, the spec will be written to the given file path. 277 Otherwise, it will be printed to the console. 278 """ 279 if format not in {"yaml", "json"}: 280 raise exc.PyAirbyteInputError( 281 message="Invalid format. Expected 'yaml' or 'json'", 282 input_value=format, 283 ) 284 if isinstance(output_file, str): 285 output_file = Path(output_file) 286 287 if format == "yaml": 288 content = yaml.dump(self.config_spec, indent=2) 289 elif format == "json": 290 content = json.dumps(self.config_spec, indent=2) 291 292 if output_file: 293 output_file.write_text(content) 294 return 295 296 syntax_highlighted = Syntax(content, format) 297 print(syntax_highlighted) 298 299 @property 300 def _yaml_spec(self) -> str: 301 """Get the spec as a yaml string. 302 303 For now, the primary use case is for writing and debugging a valid config for a source. 304 305 This is private for now because we probably want better polish before exposing this 306 as a stable interface. This will also get easier when we have docs links with this info 307 for each connector. 308 """ 309 spec_obj: ConnectorSpecification = self._get_spec() 310 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 311 # convert to a yaml string 312 return yaml.dump(spec_dict) 313 314 @property 315 def docs_url(self) -> str: 316 """Get the URL to the connector's documentation.""" 317 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 318 "source-", "" 319 ) 320 321 @property 322 def discovered_catalog(self) -> AirbyteCatalog: 323 """Get the raw catalog for the given streams. 324 325 If the catalog is not yet known, we call discover to get it. 326 """ 327 if self._discovered_catalog is None: 328 self._discovered_catalog = self._discover() 329 330 return self._discovered_catalog 331 332 @property 333 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 334 """Get the configured catalog for the given streams. 335 336 If the raw catalog is not yet known, we call discover to get it. 337 338 If no specific streams are selected, we return a catalog that syncs all available streams. 339 340 TODO: We should consider disabling by default the streams that the connector would 341 disable by default. (For instance, streams that require a premium license are sometimes 342 disabled by default within the connector.) 343 """ 344 # Ensure discovered catalog is cached before we start 345 _ = self.discovered_catalog 346 347 # Filter for selected streams if set, otherwise use all available streams: 348 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 349 return self.get_configured_catalog(streams=streams_filter) 350 351 def get_configured_catalog( 352 self, 353 streams: Literal["*"] | list[str] | None = None, 354 ) -> ConfiguredAirbyteCatalog: 355 """Get a configured catalog for the given streams. 356 357 If no streams are provided, the selected streams will be used. If no streams are selected, 358 all available streams will be used. 359 360 If '*' is provided, all available streams will be used. 361 """ 362 selected_streams: list[str] = [] 363 if streams is None: 364 selected_streams = self._selected_stream_names or self.get_available_streams() 365 elif streams == "*": 366 selected_streams = self.get_available_streams() 367 elif isinstance(streams, list): 368 selected_streams = streams 369 else: 370 raise exc.PyAirbyteInputError( 371 message="Invalid streams argument.", 372 input_value=streams, 373 ) 374 375 return ConfiguredAirbyteCatalog( 376 streams=[ 377 ConfiguredAirbyteStream( 378 stream=stream, 379 destination_sync_mode=DestinationSyncMode.overwrite, 380 primary_key=stream.source_defined_primary_key, 381 sync_mode=SyncMode.incremental, 382 ) 383 for stream in self.discovered_catalog.streams 384 if stream.name in selected_streams 385 ], 386 ) 387 388 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 389 """Return the JSON Schema spec for the specified stream name.""" 390 catalog: AirbyteCatalog = self.discovered_catalog 391 found: list[AirbyteStream] = [ 392 stream for stream in catalog.streams if stream.name == stream_name 393 ] 394 395 if len(found) == 0: 396 raise exc.PyAirbyteInputError( 397 message="Stream name does not exist in catalog.", 398 input_value=stream_name, 399 ) 400 401 if len(found) > 1: 402 raise exc.PyAirbyteInternalError( 403 message="Duplicate streams found with the same name.", 404 context={ 405 "found_streams": found, 406 }, 407 ) 408 409 return found[0].json_schema 410 411 def get_records( 412 self, 413 stream: str, 414 *, 415 normalize_field_names: bool = False, 416 prune_undeclared_fields: bool = True, 417 ) -> LazyDataset: 418 """Read a stream from the connector. 419 420 Args: 421 stream: The name of the stream to read. 422 normalize_field_names: When `True`, field names will be normalized to lower case, with 423 special characters removed. This matches the behavior of PyAirbyte caches and most 424 Airbyte destinations. 425 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 426 which generally matches the behavior of PyAirbyte caches and most Airbyte 427 destinations, specifically when you expect the catalog may be stale. You can disable 428 this to keep all fields in the records. 429 430 This involves the following steps: 431 * Call discover to get the catalog 432 * Generate a configured catalog that syncs the given stream in full_refresh mode 433 * Write the configured catalog and the config to a temporary file 434 * execute the connector with read --config <config_file> --catalog <catalog_file> 435 * Listen to the messages and return the first AirbyteRecordMessages that come along. 436 * Make sure the subprocess is killed when the function returns. 437 """ 438 discovered_catalog: AirbyteCatalog = self.discovered_catalog 439 configured_catalog = ConfiguredAirbyteCatalog( 440 streams=[ 441 ConfiguredAirbyteStream( 442 stream=s, 443 sync_mode=SyncMode.full_refresh, 444 destination_sync_mode=DestinationSyncMode.overwrite, 445 ) 446 for s in discovered_catalog.streams 447 if s.name == stream 448 ], 449 ) 450 if len(configured_catalog.streams) == 0: 451 raise exc.PyAirbyteInputError( 452 message="Requested stream does not exist.", 453 context={ 454 "stream": stream, 455 "available_streams": self.get_available_streams(), 456 "connector_name": self.name, 457 }, 458 ) from KeyError(stream) 459 460 configured_stream = configured_catalog.streams[0] 461 462 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 463 yield from records 464 465 stream_record_handler = StreamRecordHandler( 466 json_schema=self.get_stream_json_schema(stream), 467 prune_extra_fields=prune_undeclared_fields, 468 normalize_keys=normalize_field_names, 469 ) 470 471 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 472 progress_tracker = ProgressTracker( 473 ProgressStyle.PLAIN, 474 source=self, 475 cache=None, 476 destination=None, 477 expected_streams=[stream], 478 ) 479 480 iterator: Iterator[dict[str, Any]] = ( 481 StreamRecord.from_record_message( 482 record_message=record.record, 483 stream_record_handler=stream_record_handler, 484 ) 485 for record in self._read_with_catalog( 486 catalog=configured_catalog, 487 progress_tracker=progress_tracker, 488 ) 489 if record.record 490 ) 491 progress_tracker.log_success() 492 return LazyDataset( 493 iterator, 494 stream_metadata=configured_stream, 495 ) 496 497 def get_documents( 498 self, 499 stream: str, 500 title_property: str | None = None, 501 content_properties: list[str] | None = None, 502 metadata_properties: list[str] | None = None, 503 *, 504 render_metadata: bool = False, 505 ) -> Iterable[Document]: 506 """Read a stream from the connector and return the records as documents. 507 508 If metadata_properties is not set, all properties that are not content will be added to 509 the metadata. 510 511 If render_metadata is True, metadata will be rendered in the document, as well as the 512 the main content. 513 """ 514 return self.get_records(stream).to_documents( 515 title_property=title_property, 516 content_properties=content_properties, 517 metadata_properties=metadata_properties, 518 render_metadata=render_metadata, 519 ) 520 521 def _get_airbyte_message_iterator( 522 self, 523 *, 524 streams: Literal["*"] | list[str] | None = None, 525 state_provider: StateProviderBase | None = None, 526 progress_tracker: ProgressTracker, 527 force_full_refresh: bool = False, 528 ) -> AirbyteMessageIterator: 529 """Get an AirbyteMessageIterator for this source.""" 530 return AirbyteMessageIterator( 531 self._read_with_catalog( 532 catalog=self.get_configured_catalog(streams=streams), 533 state=state_provider if not force_full_refresh else None, 534 progress_tracker=progress_tracker, 535 ) 536 ) 537 538 def _read_with_catalog( 539 self, 540 catalog: ConfiguredAirbyteCatalog, 541 progress_tracker: ProgressTracker, 542 state: StateProviderBase | None = None, 543 ) -> Generator[AirbyteMessage, None, None]: 544 """Call read on the connector. 545 546 This involves the following steps: 547 * Write the config to a temporary file 548 * execute the connector with read --config <config_file> --catalog <catalog_file> 549 * Listen to the messages and return the AirbyteRecordMessages that come along. 550 * Send out telemetry on the performed sync (with information about which source was used and 551 the type of the cache) 552 """ 553 with as_temp_files( 554 [ 555 self._config, 556 catalog.model_dump_json(), 557 state.to_state_input_file_text() if state else "[]", 558 ] 559 ) as [ 560 config_file, 561 catalog_file, 562 state_file, 563 ]: 564 message_generator = self._execute( 565 [ 566 "read", 567 "--config", 568 config_file, 569 "--catalog", 570 catalog_file, 571 "--state", 572 state_file, 573 ], 574 progress_tracker=progress_tracker, 575 ) 576 yield from progress_tracker.tally_records_read(message_generator) 577 progress_tracker.log_read_complete() 578 579 def _peek_airbyte_message( 580 self, 581 message: AirbyteMessage, 582 *, 583 raise_on_error: bool = True, 584 ) -> None: 585 """Process an Airbyte message. 586 587 This method handles reading Airbyte messages and taking action, if needed, based on the 588 message type. For instance, log messages are logged, records are tallied, and errors are 589 raised as exceptions if `raise_on_error` is True. 590 591 Raises: 592 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 593 """ 594 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 595 596 def _log_incremental_streams( 597 self, 598 *, 599 incremental_streams: set[str] | None = None, 600 ) -> None: 601 """Log the streams which are using incremental sync mode.""" 602 log_message = ( 603 "The following streams are currently using incremental sync:\n" 604 f"{incremental_streams}\n" 605 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 606 ) 607 print(log_message) 608 609 def read( 610 self, 611 cache: CacheBase | None = None, 612 *, 613 streams: str | list[str] | None = None, 614 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 615 force_full_refresh: bool = False, 616 skip_validation: bool = False, 617 ) -> ReadResult: 618 """Read from the connector and write to the cache. 619 620 Args: 621 cache: The cache to write to. If not set, a default cache will be used. 622 streams: Optional if already set. A list of stream names to select for reading. If set 623 to "*", all streams will be selected. 624 write_strategy: The strategy to use when writing to the cache. If a string, it must be 625 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 626 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 627 WriteStrategy.AUTO. 628 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 629 streams will be read in incremental mode if supported by the connector. This option 630 must be True when using the "replace" strategy. 631 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 632 running the connector. This can be helpful in debugging, when you want to send 633 configurations to the connector that otherwise might be rejected by JSON Schema 634 validation rules. 635 """ 636 cache = cache or get_default_cache() 637 progress_tracker = ProgressTracker( 638 source=self, 639 cache=cache, 640 destination=None, 641 expected_streams=None, # Will be set later 642 ) 643 644 # Set up state provider if not in full refresh mode 645 if force_full_refresh: 646 state_provider: StateProviderBase | None = None 647 else: 648 state_provider = cache.get_state_provider( 649 source_name=self._name, 650 ) 651 state_writer = cache.get_state_writer(source_name=self._name) 652 653 if streams: 654 self.select_streams(streams) 655 656 if not self._selected_stream_names: 657 raise exc.PyAirbyteNoStreamsSelectedError( 658 connector_name=self.name, 659 available_streams=self.get_available_streams(), 660 ) 661 662 try: 663 result = self._read_to_cache( 664 cache=cache, 665 catalog_provider=CatalogProvider(self.configured_catalog), 666 stream_names=self._selected_stream_names, 667 state_provider=state_provider, 668 state_writer=state_writer, 669 write_strategy=write_strategy, 670 force_full_refresh=force_full_refresh, 671 skip_validation=skip_validation, 672 progress_tracker=progress_tracker, 673 ) 674 except exc.PyAirbyteInternalError as ex: 675 progress_tracker.log_failure(exception=ex) 676 raise exc.AirbyteConnectorFailedError( 677 connector_name=self.name, 678 log_text=self._last_log_messages, 679 ) from ex 680 except Exception as ex: 681 progress_tracker.log_failure(exception=ex) 682 raise 683 684 progress_tracker.log_success() 685 return result 686 687 def _read_to_cache( # noqa: PLR0913 # Too many arguments 688 self, 689 cache: CacheBase, 690 *, 691 catalog_provider: CatalogProvider, 692 stream_names: list[str], 693 state_provider: StateProviderBase | None, 694 state_writer: StateWriterBase | None, 695 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 696 force_full_refresh: bool = False, 697 skip_validation: bool = False, 698 progress_tracker: ProgressTracker, 699 ) -> ReadResult: 700 """Internal read method.""" 701 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 702 warnings.warn( 703 message=( 704 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 705 "could result in data loss. " 706 "To silence this warning, use the following: " 707 'warnings.filterwarnings("ignore", ' 708 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 709 ), 710 category=exc.PyAirbyteDataLossWarning, 711 stacklevel=1, 712 ) 713 if isinstance(write_strategy, str): 714 try: 715 write_strategy = WriteStrategy(write_strategy) 716 except ValueError: 717 raise exc.PyAirbyteInputError( 718 message="Invalid strategy", 719 context={ 720 "write_strategy": write_strategy, 721 "available_strategies": [s.value for s in WriteStrategy], 722 }, 723 ) from None 724 725 # Run optional validation step 726 if not skip_validation: 727 self.validate_config() 728 729 # Log incremental stream if incremental streams are known 730 if state_provider and state_provider.known_stream_names: 731 # Retrieve set of the known streams support which support incremental sync 732 incremental_streams = ( 733 set(self._get_incremental_stream_names()) 734 & state_provider.known_stream_names 735 & set(self.get_selected_streams()) 736 ) 737 if incremental_streams: 738 self._log_incremental_streams(incremental_streams=incremental_streams) 739 740 airbyte_message_iterator = AirbyteMessageIterator( 741 self._read_with_catalog( 742 catalog=catalog_provider.configured_catalog, 743 state=state_provider, 744 progress_tracker=progress_tracker, 745 ) 746 ) 747 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 748 stdin=airbyte_message_iterator, 749 catalog_provider=catalog_provider, 750 write_strategy=write_strategy, 751 state_writer=state_writer, 752 progress_tracker=progress_tracker, 753 ) 754 755 # Flush the WAL, if applicable 756 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 757 758 return ReadResult( 759 source_name=self.name, 760 progress_tracker=progress_tracker, 761 processed_streams=stream_names, 762 cache=cache, 763 ) 764 765 766__all__ = [ 767 "Source", 768]
53class Source(ConnectorBase): 54 """A class representing a source that can be called.""" 55 56 connector_type: Literal["source"] = "source" 57 58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 ) -> None: 68 """Initialize the source. 69 70 If config is provided, it will be validated against the spec if validate is True. 71 """ 72 self._to_be_selected_streams: list[str] | str = [] 73 """Used to hold selection criteria before catalog is known.""" 74 75 super().__init__( 76 executor=executor, 77 name=name, 78 config=config, 79 config_change_callback=config_change_callback, 80 validate=validate, 81 ) 82 self._config_dict: dict[str, Any] | None = None 83 self._last_log_messages: list[str] = [] 84 self._discovered_catalog: AirbyteCatalog | None = None 85 self._selected_stream_names: list[str] = [] 86 if config is not None: 87 self.set_config(config, validate=validate) 88 if streams is not None: 89 self.select_streams(streams) 90 91 self._deployed_api_root: str | None = None 92 self._deployed_workspace_id: str | None = None 93 self._deployed_source_id: str | None = None 94 95 def set_streams(self, streams: list[str]) -> None: 96 """Deprecated. See select_streams().""" 97 warnings.warn( 98 "The 'set_streams' method is deprecated and will be removed in a future version. " 99 "Please use the 'select_streams' method instead.", 100 DeprecationWarning, 101 stacklevel=2, 102 ) 103 self.select_streams(streams) 104 105 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 106 """Logs a warning message indicating stream selection which are not selected yet.""" 107 if streams == "*": 108 print( 109 "Warning: Config is not set yet. All streams will be selected after config is set." 110 ) 111 else: 112 print( 113 "Warning: Config is not set yet. " 114 f"Streams to be selected after config is set: {streams}" 115 ) 116 117 def select_all_streams(self) -> None: 118 """Select all streams. 119 120 This is a more streamlined equivalent to: 121 > source.select_streams(source.get_available_streams()). 122 """ 123 if self._config_dict is None: 124 self._to_be_selected_streams = "*" 125 self._log_warning_preselected_stream(self._to_be_selected_streams) 126 return 127 128 self._selected_stream_names = self.get_available_streams() 129 130 def select_streams(self, streams: str | list[str]) -> None: 131 """Select the stream names that should be read from the connector. 132 133 Args: 134 streams: A list of stream names to select. If set to "*", all streams will be selected. 135 136 Currently, if this is not set, all streams will be read. 137 """ 138 if self._config_dict is None: 139 self._to_be_selected_streams = streams 140 self._log_warning_preselected_stream(streams) 141 return 142 143 if streams == "*": 144 self.select_all_streams() 145 return 146 147 if isinstance(streams, str): 148 # If a single stream is provided, convert it to a one-item list 149 streams = [streams] 150 151 available_streams = self.get_available_streams() 152 for stream in streams: 153 if stream not in available_streams: 154 raise exc.AirbyteStreamNotFoundError( 155 stream_name=stream, 156 connector_name=self.name, 157 available_streams=available_streams, 158 ) 159 self._selected_stream_names = streams 160 161 def get_selected_streams(self) -> list[str]: 162 """Get the selected streams. 163 164 If no streams are selected, return an empty list. 165 """ 166 return self._selected_stream_names 167 168 def set_config( 169 self, 170 config: dict[str, Any], 171 *, 172 validate: bool = True, 173 ) -> None: 174 """Set the config for the connector. 175 176 If validate is True, raise an exception if the config fails validation. 177 178 If validate is False, validation will be deferred until check() or validate_config() 179 is called. 180 """ 181 if validate: 182 self.validate_config(config) 183 184 self._config_dict = config 185 186 if self._to_be_selected_streams: 187 self.select_streams(self._to_be_selected_streams) 188 self._to_be_selected_streams = [] 189 190 def get_config(self) -> dict[str, Any]: 191 """Get the config for the connector.""" 192 return self._config 193 194 @property 195 def _config(self) -> dict[str, Any]: 196 if self._config_dict is None: 197 raise exc.AirbyteConnectorConfigurationMissingError( 198 connector_name=self.name, 199 guidance="Provide via get_source() or set_config()", 200 ) 201 return self._config_dict 202 203 def _discover(self) -> AirbyteCatalog: 204 """Call discover on the connector. 205 206 This involves the following steps: 207 - Write the config to a temporary file 208 - execute the connector with discover --config <config_file> 209 - Listen to the messages and return the first AirbyteCatalog that comes along. 210 - Make sure the subprocess is killed when the function returns. 211 """ 212 with as_temp_files([self._config]) as [config_file]: 213 for msg in self._execute(["discover", "--config", config_file]): 214 if msg.type == Type.CATALOG and msg.catalog: 215 return msg.catalog 216 raise exc.AirbyteConnectorMissingCatalogError( 217 connector_name=self.name, 218 log_text=self._last_log_messages, 219 ) 220 221 def get_available_streams(self) -> list[str]: 222 """Get the available streams from the spec.""" 223 return [s.name for s in self.discovered_catalog.streams] 224 225 def _get_incremental_stream_names(self) -> list[str]: 226 """Get the name of streams that support incremental sync.""" 227 return [ 228 stream.name 229 for stream in self.discovered_catalog.streams 230 if SyncMode.incremental in stream.supported_sync_modes 231 ] 232 233 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 234 """Call spec on the connector. 235 236 This involves the following steps: 237 * execute the connector with spec 238 * Listen to the messages and return the first AirbyteCatalog that comes along. 239 * Make sure the subprocess is killed when the function returns. 240 """ 241 if force_refresh or self._spec is None: 242 for msg in self._execute(["spec"]): 243 if msg.type == Type.SPEC and msg.spec: 244 self._spec = msg.spec 245 break 246 247 if self._spec: 248 return self._spec 249 250 raise exc.AirbyteConnectorMissingSpecError( 251 connector_name=self.name, 252 log_text=self._last_log_messages, 253 ) 254 255 @property 256 def config_spec(self) -> dict[str, Any]: 257 """Generate a configuration spec for this connector, as a JSON Schema definition. 258 259 This function generates a JSON Schema dictionary with configuration specs for the 260 current connector, as a dictionary. 261 262 Returns: 263 dict: The JSON Schema configuration spec as a dictionary. 264 """ 265 return self._get_spec(force_refresh=True).connectionSpecification 266 267 def print_config_spec( 268 self, 269 format: Literal["yaml", "json"] = "yaml", # noqa: A002 270 *, 271 output_file: Path | str | None = None, 272 ) -> None: 273 """Print the configuration spec for this connector. 274 275 Args: 276 format: The format to print the spec in. Must be "yaml" or "json". 277 output_file: Optional. If set, the spec will be written to the given file path. 278 Otherwise, it will be printed to the console. 279 """ 280 if format not in {"yaml", "json"}: 281 raise exc.PyAirbyteInputError( 282 message="Invalid format. Expected 'yaml' or 'json'", 283 input_value=format, 284 ) 285 if isinstance(output_file, str): 286 output_file = Path(output_file) 287 288 if format == "yaml": 289 content = yaml.dump(self.config_spec, indent=2) 290 elif format == "json": 291 content = json.dumps(self.config_spec, indent=2) 292 293 if output_file: 294 output_file.write_text(content) 295 return 296 297 syntax_highlighted = Syntax(content, format) 298 print(syntax_highlighted) 299 300 @property 301 def _yaml_spec(self) -> str: 302 """Get the spec as a yaml string. 303 304 For now, the primary use case is for writing and debugging a valid config for a source. 305 306 This is private for now because we probably want better polish before exposing this 307 as a stable interface. This will also get easier when we have docs links with this info 308 for each connector. 309 """ 310 spec_obj: ConnectorSpecification = self._get_spec() 311 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 312 # convert to a yaml string 313 return yaml.dump(spec_dict) 314 315 @property 316 def docs_url(self) -> str: 317 """Get the URL to the connector's documentation.""" 318 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 319 "source-", "" 320 ) 321 322 @property 323 def discovered_catalog(self) -> AirbyteCatalog: 324 """Get the raw catalog for the given streams. 325 326 If the catalog is not yet known, we call discover to get it. 327 """ 328 if self._discovered_catalog is None: 329 self._discovered_catalog = self._discover() 330 331 return self._discovered_catalog 332 333 @property 334 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 335 """Get the configured catalog for the given streams. 336 337 If the raw catalog is not yet known, we call discover to get it. 338 339 If no specific streams are selected, we return a catalog that syncs all available streams. 340 341 TODO: We should consider disabling by default the streams that the connector would 342 disable by default. (For instance, streams that require a premium license are sometimes 343 disabled by default within the connector.) 344 """ 345 # Ensure discovered catalog is cached before we start 346 _ = self.discovered_catalog 347 348 # Filter for selected streams if set, otherwise use all available streams: 349 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 350 return self.get_configured_catalog(streams=streams_filter) 351 352 def get_configured_catalog( 353 self, 354 streams: Literal["*"] | list[str] | None = None, 355 ) -> ConfiguredAirbyteCatalog: 356 """Get a configured catalog for the given streams. 357 358 If no streams are provided, the selected streams will be used. If no streams are selected, 359 all available streams will be used. 360 361 If '*' is provided, all available streams will be used. 362 """ 363 selected_streams: list[str] = [] 364 if streams is None: 365 selected_streams = self._selected_stream_names or self.get_available_streams() 366 elif streams == "*": 367 selected_streams = self.get_available_streams() 368 elif isinstance(streams, list): 369 selected_streams = streams 370 else: 371 raise exc.PyAirbyteInputError( 372 message="Invalid streams argument.", 373 input_value=streams, 374 ) 375 376 return ConfiguredAirbyteCatalog( 377 streams=[ 378 ConfiguredAirbyteStream( 379 stream=stream, 380 destination_sync_mode=DestinationSyncMode.overwrite, 381 primary_key=stream.source_defined_primary_key, 382 sync_mode=SyncMode.incremental, 383 ) 384 for stream in self.discovered_catalog.streams 385 if stream.name in selected_streams 386 ], 387 ) 388 389 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 390 """Return the JSON Schema spec for the specified stream name.""" 391 catalog: AirbyteCatalog = self.discovered_catalog 392 found: list[AirbyteStream] = [ 393 stream for stream in catalog.streams if stream.name == stream_name 394 ] 395 396 if len(found) == 0: 397 raise exc.PyAirbyteInputError( 398 message="Stream name does not exist in catalog.", 399 input_value=stream_name, 400 ) 401 402 if len(found) > 1: 403 raise exc.PyAirbyteInternalError( 404 message="Duplicate streams found with the same name.", 405 context={ 406 "found_streams": found, 407 }, 408 ) 409 410 return found[0].json_schema 411 412 def get_records( 413 self, 414 stream: str, 415 *, 416 normalize_field_names: bool = False, 417 prune_undeclared_fields: bool = True, 418 ) -> LazyDataset: 419 """Read a stream from the connector. 420 421 Args: 422 stream: The name of the stream to read. 423 normalize_field_names: When `True`, field names will be normalized to lower case, with 424 special characters removed. This matches the behavior of PyAirbyte caches and most 425 Airbyte destinations. 426 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 427 which generally matches the behavior of PyAirbyte caches and most Airbyte 428 destinations, specifically when you expect the catalog may be stale. You can disable 429 this to keep all fields in the records. 430 431 This involves the following steps: 432 * Call discover to get the catalog 433 * Generate a configured catalog that syncs the given stream in full_refresh mode 434 * Write the configured catalog and the config to a temporary file 435 * execute the connector with read --config <config_file> --catalog <catalog_file> 436 * Listen to the messages and return the first AirbyteRecordMessages that come along. 437 * Make sure the subprocess is killed when the function returns. 438 """ 439 discovered_catalog: AirbyteCatalog = self.discovered_catalog 440 configured_catalog = ConfiguredAirbyteCatalog( 441 streams=[ 442 ConfiguredAirbyteStream( 443 stream=s, 444 sync_mode=SyncMode.full_refresh, 445 destination_sync_mode=DestinationSyncMode.overwrite, 446 ) 447 for s in discovered_catalog.streams 448 if s.name == stream 449 ], 450 ) 451 if len(configured_catalog.streams) == 0: 452 raise exc.PyAirbyteInputError( 453 message="Requested stream does not exist.", 454 context={ 455 "stream": stream, 456 "available_streams": self.get_available_streams(), 457 "connector_name": self.name, 458 }, 459 ) from KeyError(stream) 460 461 configured_stream = configured_catalog.streams[0] 462 463 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 464 yield from records 465 466 stream_record_handler = StreamRecordHandler( 467 json_schema=self.get_stream_json_schema(stream), 468 prune_extra_fields=prune_undeclared_fields, 469 normalize_keys=normalize_field_names, 470 ) 471 472 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 473 progress_tracker = ProgressTracker( 474 ProgressStyle.PLAIN, 475 source=self, 476 cache=None, 477 destination=None, 478 expected_streams=[stream], 479 ) 480 481 iterator: Iterator[dict[str, Any]] = ( 482 StreamRecord.from_record_message( 483 record_message=record.record, 484 stream_record_handler=stream_record_handler, 485 ) 486 for record in self._read_with_catalog( 487 catalog=configured_catalog, 488 progress_tracker=progress_tracker, 489 ) 490 if record.record 491 ) 492 progress_tracker.log_success() 493 return LazyDataset( 494 iterator, 495 stream_metadata=configured_stream, 496 ) 497 498 def get_documents( 499 self, 500 stream: str, 501 title_property: str | None = None, 502 content_properties: list[str] | None = None, 503 metadata_properties: list[str] | None = None, 504 *, 505 render_metadata: bool = False, 506 ) -> Iterable[Document]: 507 """Read a stream from the connector and return the records as documents. 508 509 If metadata_properties is not set, all properties that are not content will be added to 510 the metadata. 511 512 If render_metadata is True, metadata will be rendered in the document, as well as the 513 the main content. 514 """ 515 return self.get_records(stream).to_documents( 516 title_property=title_property, 517 content_properties=content_properties, 518 metadata_properties=metadata_properties, 519 render_metadata=render_metadata, 520 ) 521 522 def _get_airbyte_message_iterator( 523 self, 524 *, 525 streams: Literal["*"] | list[str] | None = None, 526 state_provider: StateProviderBase | None = None, 527 progress_tracker: ProgressTracker, 528 force_full_refresh: bool = False, 529 ) -> AirbyteMessageIterator: 530 """Get an AirbyteMessageIterator for this source.""" 531 return AirbyteMessageIterator( 532 self._read_with_catalog( 533 catalog=self.get_configured_catalog(streams=streams), 534 state=state_provider if not force_full_refresh else None, 535 progress_tracker=progress_tracker, 536 ) 537 ) 538 539 def _read_with_catalog( 540 self, 541 catalog: ConfiguredAirbyteCatalog, 542 progress_tracker: ProgressTracker, 543 state: StateProviderBase | None = None, 544 ) -> Generator[AirbyteMessage, None, None]: 545 """Call read on the connector. 546 547 This involves the following steps: 548 * Write the config to a temporary file 549 * execute the connector with read --config <config_file> --catalog <catalog_file> 550 * Listen to the messages and return the AirbyteRecordMessages that come along. 551 * Send out telemetry on the performed sync (with information about which source was used and 552 the type of the cache) 553 """ 554 with as_temp_files( 555 [ 556 self._config, 557 catalog.model_dump_json(), 558 state.to_state_input_file_text() if state else "[]", 559 ] 560 ) as [ 561 config_file, 562 catalog_file, 563 state_file, 564 ]: 565 message_generator = self._execute( 566 [ 567 "read", 568 "--config", 569 config_file, 570 "--catalog", 571 catalog_file, 572 "--state", 573 state_file, 574 ], 575 progress_tracker=progress_tracker, 576 ) 577 yield from progress_tracker.tally_records_read(message_generator) 578 progress_tracker.log_read_complete() 579 580 def _peek_airbyte_message( 581 self, 582 message: AirbyteMessage, 583 *, 584 raise_on_error: bool = True, 585 ) -> None: 586 """Process an Airbyte message. 587 588 This method handles reading Airbyte messages and taking action, if needed, based on the 589 message type. For instance, log messages are logged, records are tallied, and errors are 590 raised as exceptions if `raise_on_error` is True. 591 592 Raises: 593 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 594 """ 595 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 596 597 def _log_incremental_streams( 598 self, 599 *, 600 incremental_streams: set[str] | None = None, 601 ) -> None: 602 """Log the streams which are using incremental sync mode.""" 603 log_message = ( 604 "The following streams are currently using incremental sync:\n" 605 f"{incremental_streams}\n" 606 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 607 ) 608 print(log_message) 609 610 def read( 611 self, 612 cache: CacheBase | None = None, 613 *, 614 streams: str | list[str] | None = None, 615 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 616 force_full_refresh: bool = False, 617 skip_validation: bool = False, 618 ) -> ReadResult: 619 """Read from the connector and write to the cache. 620 621 Args: 622 cache: The cache to write to. If not set, a default cache will be used. 623 streams: Optional if already set. A list of stream names to select for reading. If set 624 to "*", all streams will be selected. 625 write_strategy: The strategy to use when writing to the cache. If a string, it must be 626 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 627 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 628 WriteStrategy.AUTO. 629 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 630 streams will be read in incremental mode if supported by the connector. This option 631 must be True when using the "replace" strategy. 632 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 633 running the connector. This can be helpful in debugging, when you want to send 634 configurations to the connector that otherwise might be rejected by JSON Schema 635 validation rules. 636 """ 637 cache = cache or get_default_cache() 638 progress_tracker = ProgressTracker( 639 source=self, 640 cache=cache, 641 destination=None, 642 expected_streams=None, # Will be set later 643 ) 644 645 # Set up state provider if not in full refresh mode 646 if force_full_refresh: 647 state_provider: StateProviderBase | None = None 648 else: 649 state_provider = cache.get_state_provider( 650 source_name=self._name, 651 ) 652 state_writer = cache.get_state_writer(source_name=self._name) 653 654 if streams: 655 self.select_streams(streams) 656 657 if not self._selected_stream_names: 658 raise exc.PyAirbyteNoStreamsSelectedError( 659 connector_name=self.name, 660 available_streams=self.get_available_streams(), 661 ) 662 663 try: 664 result = self._read_to_cache( 665 cache=cache, 666 catalog_provider=CatalogProvider(self.configured_catalog), 667 stream_names=self._selected_stream_names, 668 state_provider=state_provider, 669 state_writer=state_writer, 670 write_strategy=write_strategy, 671 force_full_refresh=force_full_refresh, 672 skip_validation=skip_validation, 673 progress_tracker=progress_tracker, 674 ) 675 except exc.PyAirbyteInternalError as ex: 676 progress_tracker.log_failure(exception=ex) 677 raise exc.AirbyteConnectorFailedError( 678 connector_name=self.name, 679 log_text=self._last_log_messages, 680 ) from ex 681 except Exception as ex: 682 progress_tracker.log_failure(exception=ex) 683 raise 684 685 progress_tracker.log_success() 686 return result 687 688 def _read_to_cache( # noqa: PLR0913 # Too many arguments 689 self, 690 cache: CacheBase, 691 *, 692 catalog_provider: CatalogProvider, 693 stream_names: list[str], 694 state_provider: StateProviderBase | None, 695 state_writer: StateWriterBase | None, 696 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 697 force_full_refresh: bool = False, 698 skip_validation: bool = False, 699 progress_tracker: ProgressTracker, 700 ) -> ReadResult: 701 """Internal read method.""" 702 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 703 warnings.warn( 704 message=( 705 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 706 "could result in data loss. " 707 "To silence this warning, use the following: " 708 'warnings.filterwarnings("ignore", ' 709 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 710 ), 711 category=exc.PyAirbyteDataLossWarning, 712 stacklevel=1, 713 ) 714 if isinstance(write_strategy, str): 715 try: 716 write_strategy = WriteStrategy(write_strategy) 717 except ValueError: 718 raise exc.PyAirbyteInputError( 719 message="Invalid strategy", 720 context={ 721 "write_strategy": write_strategy, 722 "available_strategies": [s.value for s in WriteStrategy], 723 }, 724 ) from None 725 726 # Run optional validation step 727 if not skip_validation: 728 self.validate_config() 729 730 # Log incremental stream if incremental streams are known 731 if state_provider and state_provider.known_stream_names: 732 # Retrieve set of the known streams support which support incremental sync 733 incremental_streams = ( 734 set(self._get_incremental_stream_names()) 735 & state_provider.known_stream_names 736 & set(self.get_selected_streams()) 737 ) 738 if incremental_streams: 739 self._log_incremental_streams(incremental_streams=incremental_streams) 740 741 airbyte_message_iterator = AirbyteMessageIterator( 742 self._read_with_catalog( 743 catalog=catalog_provider.configured_catalog, 744 state=state_provider, 745 progress_tracker=progress_tracker, 746 ) 747 ) 748 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 749 stdin=airbyte_message_iterator, 750 catalog_provider=catalog_provider, 751 write_strategy=write_strategy, 752 state_writer=state_writer, 753 progress_tracker=progress_tracker, 754 ) 755 756 # Flush the WAL, if applicable 757 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 758 759 return ReadResult( 760 source_name=self.name, 761 progress_tracker=progress_tracker, 762 processed_streams=stream_names, 763 cache=cache, 764 )
A class representing a source that can be called.
58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 ) -> None: 68 """Initialize the source. 69 70 If config is provided, it will be validated against the spec if validate is True. 71 """ 72 self._to_be_selected_streams: list[str] | str = [] 73 """Used to hold selection criteria before catalog is known.""" 74 75 super().__init__( 76 executor=executor, 77 name=name, 78 config=config, 79 config_change_callback=config_change_callback, 80 validate=validate, 81 ) 82 self._config_dict: dict[str, Any] | None = None 83 self._last_log_messages: list[str] = [] 84 self._discovered_catalog: AirbyteCatalog | None = None 85 self._selected_stream_names: list[str] = [] 86 if config is not None: 87 self.set_config(config, validate=validate) 88 if streams is not None: 89 self.select_streams(streams) 90 91 self._deployed_api_root: str | None = None 92 self._deployed_workspace_id: str | None = None 93 self._deployed_source_id: str | None = None
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
95 def set_streams(self, streams: list[str]) -> None: 96 """Deprecated. See select_streams().""" 97 warnings.warn( 98 "The 'set_streams' method is deprecated and will be removed in a future version. " 99 "Please use the 'select_streams' method instead.", 100 DeprecationWarning, 101 stacklevel=2, 102 ) 103 self.select_streams(streams)
Deprecated. See select_streams().
117 def select_all_streams(self) -> None: 118 """Select all streams. 119 120 This is a more streamlined equivalent to: 121 > source.select_streams(source.get_available_streams()). 122 """ 123 if self._config_dict is None: 124 self._to_be_selected_streams = "*" 125 self._log_warning_preselected_stream(self._to_be_selected_streams) 126 return 127 128 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
130 def select_streams(self, streams: str | list[str]) -> None: 131 """Select the stream names that should be read from the connector. 132 133 Args: 134 streams: A list of stream names to select. If set to "*", all streams will be selected. 135 136 Currently, if this is not set, all streams will be read. 137 """ 138 if self._config_dict is None: 139 self._to_be_selected_streams = streams 140 self._log_warning_preselected_stream(streams) 141 return 142 143 if streams == "*": 144 self.select_all_streams() 145 return 146 147 if isinstance(streams, str): 148 # If a single stream is provided, convert it to a one-item list 149 streams = [streams] 150 151 available_streams = self.get_available_streams() 152 for stream in streams: 153 if stream not in available_streams: 154 raise exc.AirbyteStreamNotFoundError( 155 stream_name=stream, 156 connector_name=self.name, 157 available_streams=available_streams, 158 ) 159 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
161 def get_selected_streams(self) -> list[str]: 162 """Get the selected streams. 163 164 If no streams are selected, return an empty list. 165 """ 166 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
168 def set_config( 169 self, 170 config: dict[str, Any], 171 *, 172 validate: bool = True, 173 ) -> None: 174 """Set the config for the connector. 175 176 If validate is True, raise an exception if the config fails validation. 177 178 If validate is False, validation will be deferred until check() or validate_config() 179 is called. 180 """ 181 if validate: 182 self.validate_config(config) 183 184 self._config_dict = config 185 186 if self._to_be_selected_streams: 187 self.select_streams(self._to_be_selected_streams) 188 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
190 def get_config(self) -> dict[str, Any]: 191 """Get the config for the connector.""" 192 return self._config
Get the config for the connector.
221 def get_available_streams(self) -> list[str]: 222 """Get the available streams from the spec.""" 223 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
255 @property 256 def config_spec(self) -> dict[str, Any]: 257 """Generate a configuration spec for this connector, as a JSON Schema definition. 258 259 This function generates a JSON Schema dictionary with configuration specs for the 260 current connector, as a dictionary. 261 262 Returns: 263 dict: The JSON Schema configuration spec as a dictionary. 264 """ 265 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
267 def print_config_spec( 268 self, 269 format: Literal["yaml", "json"] = "yaml", # noqa: A002 270 *, 271 output_file: Path | str | None = None, 272 ) -> None: 273 """Print the configuration spec for this connector. 274 275 Args: 276 format: The format to print the spec in. Must be "yaml" or "json". 277 output_file: Optional. If set, the spec will be written to the given file path. 278 Otherwise, it will be printed to the console. 279 """ 280 if format not in {"yaml", "json"}: 281 raise exc.PyAirbyteInputError( 282 message="Invalid format. Expected 'yaml' or 'json'", 283 input_value=format, 284 ) 285 if isinstance(output_file, str): 286 output_file = Path(output_file) 287 288 if format == "yaml": 289 content = yaml.dump(self.config_spec, indent=2) 290 elif format == "json": 291 content = json.dumps(self.config_spec, indent=2) 292 293 if output_file: 294 output_file.write_text(content) 295 return 296 297 syntax_highlighted = Syntax(content, format) 298 print(syntax_highlighted)
Print the configuration spec for this connector.
Arguments:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
315 @property 316 def docs_url(self) -> str: 317 """Get the URL to the connector's documentation.""" 318 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 319 "source-", "" 320 )
Get the URL to the connector's documentation.
322 @property 323 def discovered_catalog(self) -> AirbyteCatalog: 324 """Get the raw catalog for the given streams. 325 326 If the catalog is not yet known, we call discover to get it. 327 """ 328 if self._discovered_catalog is None: 329 self._discovered_catalog = self._discover() 330 331 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
333 @property 334 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 335 """Get the configured catalog for the given streams. 336 337 If the raw catalog is not yet known, we call discover to get it. 338 339 If no specific streams are selected, we return a catalog that syncs all available streams. 340 341 TODO: We should consider disabling by default the streams that the connector would 342 disable by default. (For instance, streams that require a premium license are sometimes 343 disabled by default within the connector.) 344 """ 345 # Ensure discovered catalog is cached before we start 346 _ = self.discovered_catalog 347 348 # Filter for selected streams if set, otherwise use all available streams: 349 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 350 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
352 def get_configured_catalog( 353 self, 354 streams: Literal["*"] | list[str] | None = None, 355 ) -> ConfiguredAirbyteCatalog: 356 """Get a configured catalog for the given streams. 357 358 If no streams are provided, the selected streams will be used. If no streams are selected, 359 all available streams will be used. 360 361 If '*' is provided, all available streams will be used. 362 """ 363 selected_streams: list[str] = [] 364 if streams is None: 365 selected_streams = self._selected_stream_names or self.get_available_streams() 366 elif streams == "*": 367 selected_streams = self.get_available_streams() 368 elif isinstance(streams, list): 369 selected_streams = streams 370 else: 371 raise exc.PyAirbyteInputError( 372 message="Invalid streams argument.", 373 input_value=streams, 374 ) 375 376 return ConfiguredAirbyteCatalog( 377 streams=[ 378 ConfiguredAirbyteStream( 379 stream=stream, 380 destination_sync_mode=DestinationSyncMode.overwrite, 381 primary_key=stream.source_defined_primary_key, 382 sync_mode=SyncMode.incremental, 383 ) 384 for stream in self.discovered_catalog.streams 385 if stream.name in selected_streams 386 ], 387 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
389 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 390 """Return the JSON Schema spec for the specified stream name.""" 391 catalog: AirbyteCatalog = self.discovered_catalog 392 found: list[AirbyteStream] = [ 393 stream for stream in catalog.streams if stream.name == stream_name 394 ] 395 396 if len(found) == 0: 397 raise exc.PyAirbyteInputError( 398 message="Stream name does not exist in catalog.", 399 input_value=stream_name, 400 ) 401 402 if len(found) > 1: 403 raise exc.PyAirbyteInternalError( 404 message="Duplicate streams found with the same name.", 405 context={ 406 "found_streams": found, 407 }, 408 ) 409 410 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
412 def get_records( 413 self, 414 stream: str, 415 *, 416 normalize_field_names: bool = False, 417 prune_undeclared_fields: bool = True, 418 ) -> LazyDataset: 419 """Read a stream from the connector. 420 421 Args: 422 stream: The name of the stream to read. 423 normalize_field_names: When `True`, field names will be normalized to lower case, with 424 special characters removed. This matches the behavior of PyAirbyte caches and most 425 Airbyte destinations. 426 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 427 which generally matches the behavior of PyAirbyte caches and most Airbyte 428 destinations, specifically when you expect the catalog may be stale. You can disable 429 this to keep all fields in the records. 430 431 This involves the following steps: 432 * Call discover to get the catalog 433 * Generate a configured catalog that syncs the given stream in full_refresh mode 434 * Write the configured catalog and the config to a temporary file 435 * execute the connector with read --config <config_file> --catalog <catalog_file> 436 * Listen to the messages and return the first AirbyteRecordMessages that come along. 437 * Make sure the subprocess is killed when the function returns. 438 """ 439 discovered_catalog: AirbyteCatalog = self.discovered_catalog 440 configured_catalog = ConfiguredAirbyteCatalog( 441 streams=[ 442 ConfiguredAirbyteStream( 443 stream=s, 444 sync_mode=SyncMode.full_refresh, 445 destination_sync_mode=DestinationSyncMode.overwrite, 446 ) 447 for s in discovered_catalog.streams 448 if s.name == stream 449 ], 450 ) 451 if len(configured_catalog.streams) == 0: 452 raise exc.PyAirbyteInputError( 453 message="Requested stream does not exist.", 454 context={ 455 "stream": stream, 456 "available_streams": self.get_available_streams(), 457 "connector_name": self.name, 458 }, 459 ) from KeyError(stream) 460 461 configured_stream = configured_catalog.streams[0] 462 463 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 464 yield from records 465 466 stream_record_handler = StreamRecordHandler( 467 json_schema=self.get_stream_json_schema(stream), 468 prune_extra_fields=prune_undeclared_fields, 469 normalize_keys=normalize_field_names, 470 ) 471 472 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 473 progress_tracker = ProgressTracker( 474 ProgressStyle.PLAIN, 475 source=self, 476 cache=None, 477 destination=None, 478 expected_streams=[stream], 479 ) 480 481 iterator: Iterator[dict[str, Any]] = ( 482 StreamRecord.from_record_message( 483 record_message=record.record, 484 stream_record_handler=stream_record_handler, 485 ) 486 for record in self._read_with_catalog( 487 catalog=configured_catalog, 488 progress_tracker=progress_tracker, 489 ) 490 if record.record 491 ) 492 progress_tracker.log_success() 493 return LazyDataset( 494 iterator, 495 stream_metadata=configured_stream, 496 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- normalize_field_names: When
True
, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True
, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
498 def get_documents( 499 self, 500 stream: str, 501 title_property: str | None = None, 502 content_properties: list[str] | None = None, 503 metadata_properties: list[str] | None = None, 504 *, 505 render_metadata: bool = False, 506 ) -> Iterable[Document]: 507 """Read a stream from the connector and return the records as documents. 508 509 If metadata_properties is not set, all properties that are not content will be added to 510 the metadata. 511 512 If render_metadata is True, metadata will be rendered in the document, as well as the 513 the main content. 514 """ 515 return self.get_records(stream).to_documents( 516 title_property=title_property, 517 content_properties=content_properties, 518 metadata_properties=metadata_properties, 519 render_metadata=render_metadata, 520 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
610 def read( 611 self, 612 cache: CacheBase | None = None, 613 *, 614 streams: str | list[str] | None = None, 615 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 616 force_full_refresh: bool = False, 617 skip_validation: bool = False, 618 ) -> ReadResult: 619 """Read from the connector and write to the cache. 620 621 Args: 622 cache: The cache to write to. If not set, a default cache will be used. 623 streams: Optional if already set. A list of stream names to select for reading. If set 624 to "*", all streams will be selected. 625 write_strategy: The strategy to use when writing to the cache. If a string, it must be 626 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 627 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 628 WriteStrategy.AUTO. 629 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 630 streams will be read in incremental mode if supported by the connector. This option 631 must be True when using the "replace" strategy. 632 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 633 running the connector. This can be helpful in debugging, when you want to send 634 configurations to the connector that otherwise might be rejected by JSON Schema 635 validation rules. 636 """ 637 cache = cache or get_default_cache() 638 progress_tracker = ProgressTracker( 639 source=self, 640 cache=cache, 641 destination=None, 642 expected_streams=None, # Will be set later 643 ) 644 645 # Set up state provider if not in full refresh mode 646 if force_full_refresh: 647 state_provider: StateProviderBase | None = None 648 else: 649 state_provider = cache.get_state_provider( 650 source_name=self._name, 651 ) 652 state_writer = cache.get_state_writer(source_name=self._name) 653 654 if streams: 655 self.select_streams(streams) 656 657 if not self._selected_stream_names: 658 raise exc.PyAirbyteNoStreamsSelectedError( 659 connector_name=self.name, 660 available_streams=self.get_available_streams(), 661 ) 662 663 try: 664 result = self._read_to_cache( 665 cache=cache, 666 catalog_provider=CatalogProvider(self.configured_catalog), 667 stream_names=self._selected_stream_names, 668 state_provider=state_provider, 669 state_writer=state_writer, 670 write_strategy=write_strategy, 671 force_full_refresh=force_full_refresh, 672 skip_validation=skip_validation, 673 progress_tracker=progress_tracker, 674 ) 675 except exc.PyAirbyteInternalError as ex: 676 progress_tracker.log_failure(exception=ex) 677 raise exc.AirbyteConnectorFailedError( 678 connector_name=self.name, 679 log_text=self._last_log_messages, 680 ) from ex 681 except Exception as ex: 682 progress_tracker.log_failure(exception=ex) 683 raise 684 685 progress_tracker.log_success() 686 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- config_hash
- validate_config
- connector_version
- check
- install
- uninstall