airbyte.sources.base
Base class implementation for sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base class implementation for sources.""" 3 4from __future__ import annotations 5 6import json 7import warnings 8from pathlib import Path 9from typing import TYPE_CHECKING, Any, Literal 10 11import yaml 12from rich import print # noqa: A004 # Allow shadowing the built-in 13from rich.syntax import Syntax 14 15from airbyte_protocol.models import ( 16 AirbyteCatalog, 17 AirbyteMessage, 18 ConfiguredAirbyteCatalog, 19 ConfiguredAirbyteStream, 20 DestinationSyncMode, 21 SyncMode, 22 Type, 23) 24 25from airbyte import exceptions as exc 26from airbyte._connector_base import ConnectorBase 27from airbyte._message_iterators import AirbyteMessageIterator 28from airbyte._util.temp_files import as_temp_files 29from airbyte.caches.util import get_default_cache 30from airbyte.datasets._lazy import LazyDataset 31from airbyte.progress import ProgressStyle, ProgressTracker 32from airbyte.records import StreamRecord, StreamRecordHandler 33from airbyte.results import ReadResult 34from airbyte.shared.catalog_providers import CatalogProvider 35from airbyte.strategies import WriteStrategy 36 37 38if TYPE_CHECKING: 39 from collections.abc import Generator, Iterable, Iterator 40 41 from airbyte_cdk import ConnectorSpecification 42 from airbyte_protocol.models import AirbyteStream 43 44 from airbyte._executors.base import Executor 45 from airbyte.caches import CacheBase 46 from airbyte.callbacks import ConfigChangeCallback 47 from airbyte.documents import Document 48 from airbyte.shared.state_providers import StateProviderBase 49 from airbyte.shared.state_writers import StateWriterBase 50 51 52class Source(ConnectorBase): 53 """A class representing a source that can be called.""" 54 55 connector_type = "source" 56 57 def __init__( 58 self, 59 executor: Executor, 60 name: str, 61 config: dict[str, Any] | None = None, 62 *, 63 config_change_callback: ConfigChangeCallback | None = None, 64 streams: str | list[str] | None = None, 65 validate: bool = False, 66 ) -> None: 67 """Initialize the source. 68 69 If config is provided, it will be validated against the spec if validate is True. 70 """ 71 self._to_be_selected_streams: list[str] | str = [] 72 """Used to hold selection criteria before catalog is known.""" 73 74 super().__init__( 75 executor=executor, 76 name=name, 77 config=config, 78 config_change_callback=config_change_callback, 79 validate=validate, 80 ) 81 self._config_dict: dict[str, Any] | None = None 82 self._last_log_messages: list[str] = [] 83 self._discovered_catalog: AirbyteCatalog | None = None 84 self._selected_stream_names: list[str] = [] 85 if config is not None: 86 self.set_config(config, validate=validate) 87 if streams is not None: 88 self.select_streams(streams) 89 90 def set_streams(self, streams: list[str]) -> None: 91 """Deprecated. See select_streams().""" 92 warnings.warn( 93 "The 'set_streams' method is deprecated and will be removed in a future version. " 94 "Please use the 'select_streams' method instead.", 95 DeprecationWarning, 96 stacklevel=2, 97 ) 98 self.select_streams(streams) 99 100 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 101 """Logs a warning message indicating stream selection which are not selected yet.""" 102 if streams == "*": 103 print( 104 "Warning: Config is not set yet. All streams will be selected after config is set." 105 ) 106 else: 107 print( 108 "Warning: Config is not set yet. " 109 f"Streams to be selected after config is set: {streams}" 110 ) 111 112 def select_all_streams(self) -> None: 113 """Select all streams. 114 115 This is a more streamlined equivalent to: 116 > source.select_streams(source.get_available_streams()). 117 """ 118 if self._config_dict is None: 119 self._to_be_selected_streams = "*" 120 self._log_warning_preselected_stream(self._to_be_selected_streams) 121 return 122 123 self._selected_stream_names = self.get_available_streams() 124 125 def select_streams(self, streams: str | list[str]) -> None: 126 """Select the stream names that should be read from the connector. 127 128 Args: 129 streams: A list of stream names to select. If set to "*", all streams will be selected. 130 131 Currently, if this is not set, all streams will be read. 132 """ 133 if self._config_dict is None: 134 self._to_be_selected_streams = streams 135 self._log_warning_preselected_stream(streams) 136 return 137 138 if streams == "*": 139 self.select_all_streams() 140 return 141 142 if isinstance(streams, str): 143 # If a single stream is provided, convert it to a one-item list 144 streams = [streams] 145 146 available_streams = self.get_available_streams() 147 for stream in streams: 148 if stream not in available_streams: 149 raise exc.AirbyteStreamNotFoundError( 150 stream_name=stream, 151 connector_name=self.name, 152 available_streams=available_streams, 153 ) 154 self._selected_stream_names = streams 155 156 def get_selected_streams(self) -> list[str]: 157 """Get the selected streams. 158 159 If no streams are selected, return an empty list. 160 """ 161 return self._selected_stream_names 162 163 def set_config( 164 self, 165 config: dict[str, Any], 166 *, 167 validate: bool = True, 168 ) -> None: 169 """Set the config for the connector. 170 171 If validate is True, raise an exception if the config fails validation. 172 173 If validate is False, validation will be deferred until check() or validate_config() 174 is called. 175 """ 176 if validate: 177 self.validate_config(config) 178 179 self._config_dict = config 180 181 if self._to_be_selected_streams: 182 self.select_streams(self._to_be_selected_streams) 183 self._to_be_selected_streams = [] 184 185 def get_config(self) -> dict[str, Any]: 186 """Get the config for the connector.""" 187 return self._config 188 189 @property 190 def _config(self) -> dict[str, Any]: 191 if self._config_dict is None: 192 raise exc.AirbyteConnectorConfigurationMissingError( 193 connector_name=self.name, 194 guidance="Provide via get_source() or set_config()", 195 ) 196 return self._config_dict 197 198 def _discover(self) -> AirbyteCatalog: 199 """Call discover on the connector. 200 201 This involves the following steps: 202 - Write the config to a temporary file 203 - execute the connector with discover --config <config_file> 204 - Listen to the messages and return the first AirbyteCatalog that comes along. 205 - Make sure the subprocess is killed when the function returns. 206 """ 207 with as_temp_files([self._config]) as [config_file]: 208 for msg in self._execute(["discover", "--config", config_file]): 209 if msg.type == Type.CATALOG and msg.catalog: 210 return msg.catalog 211 raise exc.AirbyteConnectorMissingCatalogError( 212 connector_name=self.name, 213 log_text=self._last_log_messages, 214 ) 215 216 def get_available_streams(self) -> list[str]: 217 """Get the available streams from the spec.""" 218 return [s.name for s in self.discovered_catalog.streams] 219 220 def _get_incremental_stream_names(self) -> list[str]: 221 """Get the name of streams that support incremental sync.""" 222 return [ 223 stream.name 224 for stream in self.discovered_catalog.streams 225 if SyncMode.incremental in stream.supported_sync_modes 226 ] 227 228 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 229 """Call spec on the connector. 230 231 This involves the following steps: 232 * execute the connector with spec 233 * Listen to the messages and return the first AirbyteCatalog that comes along. 234 * Make sure the subprocess is killed when the function returns. 235 """ 236 if force_refresh or self._spec is None: 237 for msg in self._execute(["spec"]): 238 if msg.type == Type.SPEC and msg.spec: 239 self._spec = msg.spec 240 break 241 242 if self._spec: 243 return self._spec 244 245 raise exc.AirbyteConnectorMissingSpecError( 246 connector_name=self.name, 247 log_text=self._last_log_messages, 248 ) 249 250 @property 251 def config_spec(self) -> dict[str, Any]: 252 """Generate a configuration spec for this connector, as a JSON Schema definition. 253 254 This function generates a JSON Schema dictionary with configuration specs for the 255 current connector, as a dictionary. 256 257 Returns: 258 dict: The JSON Schema configuration spec as a dictionary. 259 """ 260 return self._get_spec(force_refresh=True).connectionSpecification 261 262 def print_config_spec( 263 self, 264 format: Literal["yaml", "json"] = "yaml", # noqa: A002 265 *, 266 output_file: Path | str | None = None, 267 ) -> None: 268 """Print the configuration spec for this connector. 269 270 Args: 271 format: The format to print the spec in. Must be "yaml" or "json". 272 output_file: Optional. If set, the spec will be written to the given file path. 273 Otherwise, it will be printed to the console. 274 """ 275 if format not in {"yaml", "json"}: 276 raise exc.PyAirbyteInputError( 277 message="Invalid format. Expected 'yaml' or 'json'", 278 input_value=format, 279 ) 280 if isinstance(output_file, str): 281 output_file = Path(output_file) 282 283 if format == "yaml": 284 content = yaml.dump(self.config_spec, indent=2) 285 elif format == "json": 286 content = json.dumps(self.config_spec, indent=2) 287 288 if output_file: 289 output_file.write_text(content) 290 return 291 292 syntax_highlighted = Syntax(content, format) 293 print(syntax_highlighted) 294 295 @property 296 def _yaml_spec(self) -> str: 297 """Get the spec as a yaml string. 298 299 For now, the primary use case is for writing and debugging a valid config for a source. 300 301 This is private for now because we probably want better polish before exposing this 302 as a stable interface. This will also get easier when we have docs links with this info 303 for each connector. 304 """ 305 spec_obj: ConnectorSpecification = self._get_spec() 306 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 307 # convert to a yaml string 308 return yaml.dump(spec_dict) 309 310 @property 311 def docs_url(self) -> str: 312 """Get the URL to the connector's documentation.""" 313 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 314 "source-", "" 315 ) 316 317 @property 318 def discovered_catalog(self) -> AirbyteCatalog: 319 """Get the raw catalog for the given streams. 320 321 If the catalog is not yet known, we call discover to get it. 322 """ 323 if self._discovered_catalog is None: 324 self._discovered_catalog = self._discover() 325 326 return self._discovered_catalog 327 328 @property 329 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 330 """Get the configured catalog for the given streams. 331 332 If the raw catalog is not yet known, we call discover to get it. 333 334 If no specific streams are selected, we return a catalog that syncs all available streams. 335 336 TODO: We should consider disabling by default the streams that the connector would 337 disable by default. (For instance, streams that require a premium license are sometimes 338 disabled by default within the connector.) 339 """ 340 # Ensure discovered catalog is cached before we start 341 _ = self.discovered_catalog 342 343 # Filter for selected streams if set, otherwise use all available streams: 344 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 345 return self.get_configured_catalog(streams=streams_filter) 346 347 def get_configured_catalog( 348 self, 349 streams: Literal["*"] | list[str] | None = None, 350 ) -> ConfiguredAirbyteCatalog: 351 """Get a configured catalog for the given streams. 352 353 If no streams are provided, the selected streams will be used. If no streams are selected, 354 all available streams will be used. 355 356 If '*' is provided, all available streams will be used. 357 """ 358 selected_streams: list[str] = [] 359 if streams is None: 360 selected_streams = self._selected_stream_names or self.get_available_streams() 361 elif streams == "*": 362 selected_streams = self.get_available_streams() 363 elif isinstance(streams, list): 364 selected_streams = streams 365 else: 366 raise exc.PyAirbyteInputError( 367 message="Invalid streams argument.", 368 input_value=streams, 369 ) 370 371 return ConfiguredAirbyteCatalog( 372 streams=[ 373 ConfiguredAirbyteStream( 374 stream=stream, 375 destination_sync_mode=DestinationSyncMode.overwrite, 376 primary_key=stream.source_defined_primary_key, 377 sync_mode=SyncMode.incremental, 378 ) 379 for stream in self.discovered_catalog.streams 380 if stream.name in selected_streams 381 ], 382 ) 383 384 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 385 """Return the JSON Schema spec for the specified stream name.""" 386 catalog: AirbyteCatalog = self.discovered_catalog 387 found: list[AirbyteStream] = [ 388 stream for stream in catalog.streams if stream.name == stream_name 389 ] 390 391 if len(found) == 0: 392 raise exc.PyAirbyteInputError( 393 message="Stream name does not exist in catalog.", 394 input_value=stream_name, 395 ) 396 397 if len(found) > 1: 398 raise exc.PyAirbyteInternalError( 399 message="Duplicate streams found with the same name.", 400 context={ 401 "found_streams": found, 402 }, 403 ) 404 405 return found[0].json_schema 406 407 def get_records( 408 self, 409 stream: str, 410 *, 411 normalize_field_names: bool = False, 412 prune_undeclared_fields: bool = True, 413 ) -> LazyDataset: 414 """Read a stream from the connector. 415 416 Args: 417 stream: The name of the stream to read. 418 normalize_field_names: When `True`, field names will be normalized to lower case, with 419 special characters removed. This matches the behavior of PyAirbyte caches and most 420 Airbyte destinations. 421 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 422 which generally matches the behavior of PyAirbyte caches and most Airbyte 423 destinations, specifically when you expect the catalog may be stale. You can disable 424 this to keep all fields in the records. 425 426 This involves the following steps: 427 * Call discover to get the catalog 428 * Generate a configured catalog that syncs the given stream in full_refresh mode 429 * Write the configured catalog and the config to a temporary file 430 * execute the connector with read --config <config_file> --catalog <catalog_file> 431 * Listen to the messages and return the first AirbyteRecordMessages that come along. 432 * Make sure the subprocess is killed when the function returns. 433 """ 434 discovered_catalog: AirbyteCatalog = self.discovered_catalog 435 configured_catalog = ConfiguredAirbyteCatalog( 436 streams=[ 437 ConfiguredAirbyteStream( 438 stream=s, 439 sync_mode=SyncMode.full_refresh, 440 destination_sync_mode=DestinationSyncMode.overwrite, 441 ) 442 for s in discovered_catalog.streams 443 if s.name == stream 444 ], 445 ) 446 if len(configured_catalog.streams) == 0: 447 raise exc.PyAirbyteInputError( 448 message="Requested stream does not exist.", 449 context={ 450 "stream": stream, 451 "available_streams": self.get_available_streams(), 452 "connector_name": self.name, 453 }, 454 ) from KeyError(stream) 455 456 configured_stream = configured_catalog.streams[0] 457 458 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 459 yield from records 460 461 stream_record_handler = StreamRecordHandler( 462 json_schema=self.get_stream_json_schema(stream), 463 prune_extra_fields=prune_undeclared_fields, 464 normalize_keys=normalize_field_names, 465 ) 466 467 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 468 progress_tracker = ProgressTracker( 469 ProgressStyle.PLAIN, 470 source=self, 471 cache=None, 472 destination=None, 473 expected_streams=[stream], 474 ) 475 476 iterator: Iterator[dict[str, Any]] = ( 477 StreamRecord.from_record_message( 478 record_message=record.record, 479 stream_record_handler=stream_record_handler, 480 ) 481 for record in self._read_with_catalog( 482 catalog=configured_catalog, 483 progress_tracker=progress_tracker, 484 ) 485 if record.record 486 ) 487 progress_tracker.log_success() 488 return LazyDataset( 489 iterator, 490 stream_metadata=configured_stream, 491 ) 492 493 def get_documents( 494 self, 495 stream: str, 496 title_property: str | None = None, 497 content_properties: list[str] | None = None, 498 metadata_properties: list[str] | None = None, 499 *, 500 render_metadata: bool = False, 501 ) -> Iterable[Document]: 502 """Read a stream from the connector and return the records as documents. 503 504 If metadata_properties is not set, all properties that are not content will be added to 505 the metadata. 506 507 If render_metadata is True, metadata will be rendered in the document, as well as the 508 the main content. 509 """ 510 return self.get_records(stream).to_documents( 511 title_property=title_property, 512 content_properties=content_properties, 513 metadata_properties=metadata_properties, 514 render_metadata=render_metadata, 515 ) 516 517 def _get_airbyte_message_iterator( 518 self, 519 *, 520 streams: Literal["*"] | list[str] | None = None, 521 state_provider: StateProviderBase | None = None, 522 progress_tracker: ProgressTracker, 523 force_full_refresh: bool = False, 524 ) -> AirbyteMessageIterator: 525 """Get an AirbyteMessageIterator for this source.""" 526 return AirbyteMessageIterator( 527 self._read_with_catalog( 528 catalog=self.get_configured_catalog(streams=streams), 529 state=state_provider if not force_full_refresh else None, 530 progress_tracker=progress_tracker, 531 ) 532 ) 533 534 def _read_with_catalog( 535 self, 536 catalog: ConfiguredAirbyteCatalog, 537 progress_tracker: ProgressTracker, 538 state: StateProviderBase | None = None, 539 ) -> Generator[AirbyteMessage, None, None]: 540 """Call read on the connector. 541 542 This involves the following steps: 543 * Write the config to a temporary file 544 * execute the connector with read --config <config_file> --catalog <catalog_file> 545 * Listen to the messages and return the AirbyteRecordMessages that come along. 546 * Send out telemetry on the performed sync (with information about which source was used and 547 the type of the cache) 548 """ 549 with as_temp_files( 550 [ 551 self._config, 552 catalog.model_dump_json(), 553 state.to_state_input_file_text() if state else "[]", 554 ] 555 ) as [ 556 config_file, 557 catalog_file, 558 state_file, 559 ]: 560 message_generator = self._execute( 561 [ 562 "read", 563 "--config", 564 config_file, 565 "--catalog", 566 catalog_file, 567 "--state", 568 state_file, 569 ], 570 progress_tracker=progress_tracker, 571 ) 572 yield from progress_tracker.tally_records_read(message_generator) 573 progress_tracker.log_read_complete() 574 575 def _peek_airbyte_message( 576 self, 577 message: AirbyteMessage, 578 *, 579 raise_on_error: bool = True, 580 ) -> None: 581 """Process an Airbyte message. 582 583 This method handles reading Airbyte messages and taking action, if needed, based on the 584 message type. For instance, log messages are logged, records are tallied, and errors are 585 raised as exceptions if `raise_on_error` is True. 586 587 Raises: 588 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 589 """ 590 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 591 592 def _log_incremental_streams( 593 self, 594 *, 595 incremental_streams: set[str] | None = None, 596 ) -> None: 597 """Log the streams which are using incremental sync mode.""" 598 log_message = ( 599 "The following streams are currently using incremental sync:\n" 600 f"{incremental_streams}\n" 601 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 602 ) 603 print(log_message) 604 605 def read( 606 self, 607 cache: CacheBase | None = None, 608 *, 609 streams: str | list[str] | None = None, 610 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 611 force_full_refresh: bool = False, 612 skip_validation: bool = False, 613 ) -> ReadResult: 614 """Read from the connector and write to the cache. 615 616 Args: 617 cache: The cache to write to. If not set, a default cache will be used. 618 streams: Optional if already set. A list of stream names to select for reading. If set 619 to "*", all streams will be selected. 620 write_strategy: The strategy to use when writing to the cache. If a string, it must be 621 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 622 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 623 WriteStrategy.AUTO. 624 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 625 streams will be read in incremental mode if supported by the connector. This option 626 must be True when using the "replace" strategy. 627 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 628 running the connector. This can be helpful in debugging, when you want to send 629 configurations to the connector that otherwise might be rejected by JSON Schema 630 validation rules. 631 """ 632 cache = cache or get_default_cache() 633 progress_tracker = ProgressTracker( 634 source=self, 635 cache=cache, 636 destination=None, 637 expected_streams=None, # Will be set later 638 ) 639 640 # Set up state provider if not in full refresh mode 641 if force_full_refresh: 642 state_provider: StateProviderBase | None = None 643 else: 644 state_provider = cache.get_state_provider( 645 source_name=self._name, 646 ) 647 state_writer = cache.get_state_writer(source_name=self._name) 648 649 if streams: 650 self.select_streams(streams) 651 652 if not self._selected_stream_names: 653 raise exc.PyAirbyteNoStreamsSelectedError( 654 connector_name=self.name, 655 available_streams=self.get_available_streams(), 656 ) 657 658 try: 659 result = self._read_to_cache( 660 cache=cache, 661 catalog_provider=CatalogProvider(self.configured_catalog), 662 stream_names=self._selected_stream_names, 663 state_provider=state_provider, 664 state_writer=state_writer, 665 write_strategy=write_strategy, 666 force_full_refresh=force_full_refresh, 667 skip_validation=skip_validation, 668 progress_tracker=progress_tracker, 669 ) 670 except exc.PyAirbyteInternalError as ex: 671 progress_tracker.log_failure(exception=ex) 672 raise exc.AirbyteConnectorFailedError( 673 connector_name=self.name, 674 log_text=self._last_log_messages, 675 ) from ex 676 except Exception as ex: 677 progress_tracker.log_failure(exception=ex) 678 raise 679 680 progress_tracker.log_success() 681 return result 682 683 def _read_to_cache( # noqa: PLR0913 # Too many arguments 684 self, 685 cache: CacheBase, 686 *, 687 catalog_provider: CatalogProvider, 688 stream_names: list[str], 689 state_provider: StateProviderBase | None, 690 state_writer: StateWriterBase | None, 691 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 692 force_full_refresh: bool = False, 693 skip_validation: bool = False, 694 progress_tracker: ProgressTracker, 695 ) -> ReadResult: 696 """Internal read method.""" 697 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 698 warnings.warn( 699 message=( 700 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 701 "could result in data loss. " 702 "To silence this warning, use the following: " 703 'warnings.filterwarnings("ignore", ' 704 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 705 ), 706 category=exc.PyAirbyteDataLossWarning, 707 stacklevel=1, 708 ) 709 if isinstance(write_strategy, str): 710 try: 711 write_strategy = WriteStrategy(write_strategy) 712 except ValueError: 713 raise exc.PyAirbyteInputError( 714 message="Invalid strategy", 715 context={ 716 "write_strategy": write_strategy, 717 "available_strategies": [s.value for s in WriteStrategy], 718 }, 719 ) from None 720 721 # Run optional validation step 722 if not skip_validation: 723 self.validate_config() 724 725 # Log incremental stream if incremental streams are known 726 if state_provider and state_provider.known_stream_names: 727 # Retrieve set of the known streams support which support incremental sync 728 incremental_streams = ( 729 set(self._get_incremental_stream_names()) 730 & state_provider.known_stream_names 731 & set(self.get_selected_streams()) 732 ) 733 if incremental_streams: 734 self._log_incremental_streams(incremental_streams=incremental_streams) 735 736 airbyte_message_iterator = AirbyteMessageIterator( 737 self._read_with_catalog( 738 catalog=catalog_provider.configured_catalog, 739 state=state_provider, 740 progress_tracker=progress_tracker, 741 ) 742 ) 743 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 744 stdin=airbyte_message_iterator, 745 catalog_provider=catalog_provider, 746 write_strategy=write_strategy, 747 state_writer=state_writer, 748 progress_tracker=progress_tracker, 749 ) 750 751 # Flush the WAL, if applicable 752 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 753 754 return ReadResult( 755 source_name=self.name, 756 progress_tracker=progress_tracker, 757 processed_streams=stream_names, 758 cache=cache, 759 ) 760 761 762__all__ = [ 763 "Source", 764]
53class Source(ConnectorBase): 54 """A class representing a source that can be called.""" 55 56 connector_type = "source" 57 58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 ) -> None: 68 """Initialize the source. 69 70 If config is provided, it will be validated against the spec if validate is True. 71 """ 72 self._to_be_selected_streams: list[str] | str = [] 73 """Used to hold selection criteria before catalog is known.""" 74 75 super().__init__( 76 executor=executor, 77 name=name, 78 config=config, 79 config_change_callback=config_change_callback, 80 validate=validate, 81 ) 82 self._config_dict: dict[str, Any] | None = None 83 self._last_log_messages: list[str] = [] 84 self._discovered_catalog: AirbyteCatalog | None = None 85 self._selected_stream_names: list[str] = [] 86 if config is not None: 87 self.set_config(config, validate=validate) 88 if streams is not None: 89 self.select_streams(streams) 90 91 def set_streams(self, streams: list[str]) -> None: 92 """Deprecated. See select_streams().""" 93 warnings.warn( 94 "The 'set_streams' method is deprecated and will be removed in a future version. " 95 "Please use the 'select_streams' method instead.", 96 DeprecationWarning, 97 stacklevel=2, 98 ) 99 self.select_streams(streams) 100 101 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 102 """Logs a warning message indicating stream selection which are not selected yet.""" 103 if streams == "*": 104 print( 105 "Warning: Config is not set yet. All streams will be selected after config is set." 106 ) 107 else: 108 print( 109 "Warning: Config is not set yet. " 110 f"Streams to be selected after config is set: {streams}" 111 ) 112 113 def select_all_streams(self) -> None: 114 """Select all streams. 115 116 This is a more streamlined equivalent to: 117 > source.select_streams(source.get_available_streams()). 118 """ 119 if self._config_dict is None: 120 self._to_be_selected_streams = "*" 121 self._log_warning_preselected_stream(self._to_be_selected_streams) 122 return 123 124 self._selected_stream_names = self.get_available_streams() 125 126 def select_streams(self, streams: str | list[str]) -> None: 127 """Select the stream names that should be read from the connector. 128 129 Args: 130 streams: A list of stream names to select. If set to "*", all streams will be selected. 131 132 Currently, if this is not set, all streams will be read. 133 """ 134 if self._config_dict is None: 135 self._to_be_selected_streams = streams 136 self._log_warning_preselected_stream(streams) 137 return 138 139 if streams == "*": 140 self.select_all_streams() 141 return 142 143 if isinstance(streams, str): 144 # If a single stream is provided, convert it to a one-item list 145 streams = [streams] 146 147 available_streams = self.get_available_streams() 148 for stream in streams: 149 if stream not in available_streams: 150 raise exc.AirbyteStreamNotFoundError( 151 stream_name=stream, 152 connector_name=self.name, 153 available_streams=available_streams, 154 ) 155 self._selected_stream_names = streams 156 157 def get_selected_streams(self) -> list[str]: 158 """Get the selected streams. 159 160 If no streams are selected, return an empty list. 161 """ 162 return self._selected_stream_names 163 164 def set_config( 165 self, 166 config: dict[str, Any], 167 *, 168 validate: bool = True, 169 ) -> None: 170 """Set the config for the connector. 171 172 If validate is True, raise an exception if the config fails validation. 173 174 If validate is False, validation will be deferred until check() or validate_config() 175 is called. 176 """ 177 if validate: 178 self.validate_config(config) 179 180 self._config_dict = config 181 182 if self._to_be_selected_streams: 183 self.select_streams(self._to_be_selected_streams) 184 self._to_be_selected_streams = [] 185 186 def get_config(self) -> dict[str, Any]: 187 """Get the config for the connector.""" 188 return self._config 189 190 @property 191 def _config(self) -> dict[str, Any]: 192 if self._config_dict is None: 193 raise exc.AirbyteConnectorConfigurationMissingError( 194 connector_name=self.name, 195 guidance="Provide via get_source() or set_config()", 196 ) 197 return self._config_dict 198 199 def _discover(self) -> AirbyteCatalog: 200 """Call discover on the connector. 201 202 This involves the following steps: 203 - Write the config to a temporary file 204 - execute the connector with discover --config <config_file> 205 - Listen to the messages and return the first AirbyteCatalog that comes along. 206 - Make sure the subprocess is killed when the function returns. 207 """ 208 with as_temp_files([self._config]) as [config_file]: 209 for msg in self._execute(["discover", "--config", config_file]): 210 if msg.type == Type.CATALOG and msg.catalog: 211 return msg.catalog 212 raise exc.AirbyteConnectorMissingCatalogError( 213 connector_name=self.name, 214 log_text=self._last_log_messages, 215 ) 216 217 def get_available_streams(self) -> list[str]: 218 """Get the available streams from the spec.""" 219 return [s.name for s in self.discovered_catalog.streams] 220 221 def _get_incremental_stream_names(self) -> list[str]: 222 """Get the name of streams that support incremental sync.""" 223 return [ 224 stream.name 225 for stream in self.discovered_catalog.streams 226 if SyncMode.incremental in stream.supported_sync_modes 227 ] 228 229 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 230 """Call spec on the connector. 231 232 This involves the following steps: 233 * execute the connector with spec 234 * Listen to the messages and return the first AirbyteCatalog that comes along. 235 * Make sure the subprocess is killed when the function returns. 236 """ 237 if force_refresh or self._spec is None: 238 for msg in self._execute(["spec"]): 239 if msg.type == Type.SPEC and msg.spec: 240 self._spec = msg.spec 241 break 242 243 if self._spec: 244 return self._spec 245 246 raise exc.AirbyteConnectorMissingSpecError( 247 connector_name=self.name, 248 log_text=self._last_log_messages, 249 ) 250 251 @property 252 def config_spec(self) -> dict[str, Any]: 253 """Generate a configuration spec for this connector, as a JSON Schema definition. 254 255 This function generates a JSON Schema dictionary with configuration specs for the 256 current connector, as a dictionary. 257 258 Returns: 259 dict: The JSON Schema configuration spec as a dictionary. 260 """ 261 return self._get_spec(force_refresh=True).connectionSpecification 262 263 def print_config_spec( 264 self, 265 format: Literal["yaml", "json"] = "yaml", # noqa: A002 266 *, 267 output_file: Path | str | None = None, 268 ) -> None: 269 """Print the configuration spec for this connector. 270 271 Args: 272 format: The format to print the spec in. Must be "yaml" or "json". 273 output_file: Optional. If set, the spec will be written to the given file path. 274 Otherwise, it will be printed to the console. 275 """ 276 if format not in {"yaml", "json"}: 277 raise exc.PyAirbyteInputError( 278 message="Invalid format. Expected 'yaml' or 'json'", 279 input_value=format, 280 ) 281 if isinstance(output_file, str): 282 output_file = Path(output_file) 283 284 if format == "yaml": 285 content = yaml.dump(self.config_spec, indent=2) 286 elif format == "json": 287 content = json.dumps(self.config_spec, indent=2) 288 289 if output_file: 290 output_file.write_text(content) 291 return 292 293 syntax_highlighted = Syntax(content, format) 294 print(syntax_highlighted) 295 296 @property 297 def _yaml_spec(self) -> str: 298 """Get the spec as a yaml string. 299 300 For now, the primary use case is for writing and debugging a valid config for a source. 301 302 This is private for now because we probably want better polish before exposing this 303 as a stable interface. This will also get easier when we have docs links with this info 304 for each connector. 305 """ 306 spec_obj: ConnectorSpecification = self._get_spec() 307 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 308 # convert to a yaml string 309 return yaml.dump(spec_dict) 310 311 @property 312 def docs_url(self) -> str: 313 """Get the URL to the connector's documentation.""" 314 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 315 "source-", "" 316 ) 317 318 @property 319 def discovered_catalog(self) -> AirbyteCatalog: 320 """Get the raw catalog for the given streams. 321 322 If the catalog is not yet known, we call discover to get it. 323 """ 324 if self._discovered_catalog is None: 325 self._discovered_catalog = self._discover() 326 327 return self._discovered_catalog 328 329 @property 330 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 331 """Get the configured catalog for the given streams. 332 333 If the raw catalog is not yet known, we call discover to get it. 334 335 If no specific streams are selected, we return a catalog that syncs all available streams. 336 337 TODO: We should consider disabling by default the streams that the connector would 338 disable by default. (For instance, streams that require a premium license are sometimes 339 disabled by default within the connector.) 340 """ 341 # Ensure discovered catalog is cached before we start 342 _ = self.discovered_catalog 343 344 # Filter for selected streams if set, otherwise use all available streams: 345 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 346 return self.get_configured_catalog(streams=streams_filter) 347 348 def get_configured_catalog( 349 self, 350 streams: Literal["*"] | list[str] | None = None, 351 ) -> ConfiguredAirbyteCatalog: 352 """Get a configured catalog for the given streams. 353 354 If no streams are provided, the selected streams will be used. If no streams are selected, 355 all available streams will be used. 356 357 If '*' is provided, all available streams will be used. 358 """ 359 selected_streams: list[str] = [] 360 if streams is None: 361 selected_streams = self._selected_stream_names or self.get_available_streams() 362 elif streams == "*": 363 selected_streams = self.get_available_streams() 364 elif isinstance(streams, list): 365 selected_streams = streams 366 else: 367 raise exc.PyAirbyteInputError( 368 message="Invalid streams argument.", 369 input_value=streams, 370 ) 371 372 return ConfiguredAirbyteCatalog( 373 streams=[ 374 ConfiguredAirbyteStream( 375 stream=stream, 376 destination_sync_mode=DestinationSyncMode.overwrite, 377 primary_key=stream.source_defined_primary_key, 378 sync_mode=SyncMode.incremental, 379 ) 380 for stream in self.discovered_catalog.streams 381 if stream.name in selected_streams 382 ], 383 ) 384 385 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 386 """Return the JSON Schema spec for the specified stream name.""" 387 catalog: AirbyteCatalog = self.discovered_catalog 388 found: list[AirbyteStream] = [ 389 stream for stream in catalog.streams if stream.name == stream_name 390 ] 391 392 if len(found) == 0: 393 raise exc.PyAirbyteInputError( 394 message="Stream name does not exist in catalog.", 395 input_value=stream_name, 396 ) 397 398 if len(found) > 1: 399 raise exc.PyAirbyteInternalError( 400 message="Duplicate streams found with the same name.", 401 context={ 402 "found_streams": found, 403 }, 404 ) 405 406 return found[0].json_schema 407 408 def get_records( 409 self, 410 stream: str, 411 *, 412 normalize_field_names: bool = False, 413 prune_undeclared_fields: bool = True, 414 ) -> LazyDataset: 415 """Read a stream from the connector. 416 417 Args: 418 stream: The name of the stream to read. 419 normalize_field_names: When `True`, field names will be normalized to lower case, with 420 special characters removed. This matches the behavior of PyAirbyte caches and most 421 Airbyte destinations. 422 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 423 which generally matches the behavior of PyAirbyte caches and most Airbyte 424 destinations, specifically when you expect the catalog may be stale. You can disable 425 this to keep all fields in the records. 426 427 This involves the following steps: 428 * Call discover to get the catalog 429 * Generate a configured catalog that syncs the given stream in full_refresh mode 430 * Write the configured catalog and the config to a temporary file 431 * execute the connector with read --config <config_file> --catalog <catalog_file> 432 * Listen to the messages and return the first AirbyteRecordMessages that come along. 433 * Make sure the subprocess is killed when the function returns. 434 """ 435 discovered_catalog: AirbyteCatalog = self.discovered_catalog 436 configured_catalog = ConfiguredAirbyteCatalog( 437 streams=[ 438 ConfiguredAirbyteStream( 439 stream=s, 440 sync_mode=SyncMode.full_refresh, 441 destination_sync_mode=DestinationSyncMode.overwrite, 442 ) 443 for s in discovered_catalog.streams 444 if s.name == stream 445 ], 446 ) 447 if len(configured_catalog.streams) == 0: 448 raise exc.PyAirbyteInputError( 449 message="Requested stream does not exist.", 450 context={ 451 "stream": stream, 452 "available_streams": self.get_available_streams(), 453 "connector_name": self.name, 454 }, 455 ) from KeyError(stream) 456 457 configured_stream = configured_catalog.streams[0] 458 459 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 460 yield from records 461 462 stream_record_handler = StreamRecordHandler( 463 json_schema=self.get_stream_json_schema(stream), 464 prune_extra_fields=prune_undeclared_fields, 465 normalize_keys=normalize_field_names, 466 ) 467 468 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 469 progress_tracker = ProgressTracker( 470 ProgressStyle.PLAIN, 471 source=self, 472 cache=None, 473 destination=None, 474 expected_streams=[stream], 475 ) 476 477 iterator: Iterator[dict[str, Any]] = ( 478 StreamRecord.from_record_message( 479 record_message=record.record, 480 stream_record_handler=stream_record_handler, 481 ) 482 for record in self._read_with_catalog( 483 catalog=configured_catalog, 484 progress_tracker=progress_tracker, 485 ) 486 if record.record 487 ) 488 progress_tracker.log_success() 489 return LazyDataset( 490 iterator, 491 stream_metadata=configured_stream, 492 ) 493 494 def get_documents( 495 self, 496 stream: str, 497 title_property: str | None = None, 498 content_properties: list[str] | None = None, 499 metadata_properties: list[str] | None = None, 500 *, 501 render_metadata: bool = False, 502 ) -> Iterable[Document]: 503 """Read a stream from the connector and return the records as documents. 504 505 If metadata_properties is not set, all properties that are not content will be added to 506 the metadata. 507 508 If render_metadata is True, metadata will be rendered in the document, as well as the 509 the main content. 510 """ 511 return self.get_records(stream).to_documents( 512 title_property=title_property, 513 content_properties=content_properties, 514 metadata_properties=metadata_properties, 515 render_metadata=render_metadata, 516 ) 517 518 def _get_airbyte_message_iterator( 519 self, 520 *, 521 streams: Literal["*"] | list[str] | None = None, 522 state_provider: StateProviderBase | None = None, 523 progress_tracker: ProgressTracker, 524 force_full_refresh: bool = False, 525 ) -> AirbyteMessageIterator: 526 """Get an AirbyteMessageIterator for this source.""" 527 return AirbyteMessageIterator( 528 self._read_with_catalog( 529 catalog=self.get_configured_catalog(streams=streams), 530 state=state_provider if not force_full_refresh else None, 531 progress_tracker=progress_tracker, 532 ) 533 ) 534 535 def _read_with_catalog( 536 self, 537 catalog: ConfiguredAirbyteCatalog, 538 progress_tracker: ProgressTracker, 539 state: StateProviderBase | None = None, 540 ) -> Generator[AirbyteMessage, None, None]: 541 """Call read on the connector. 542 543 This involves the following steps: 544 * Write the config to a temporary file 545 * execute the connector with read --config <config_file> --catalog <catalog_file> 546 * Listen to the messages and return the AirbyteRecordMessages that come along. 547 * Send out telemetry on the performed sync (with information about which source was used and 548 the type of the cache) 549 """ 550 with as_temp_files( 551 [ 552 self._config, 553 catalog.model_dump_json(), 554 state.to_state_input_file_text() if state else "[]", 555 ] 556 ) as [ 557 config_file, 558 catalog_file, 559 state_file, 560 ]: 561 message_generator = self._execute( 562 [ 563 "read", 564 "--config", 565 config_file, 566 "--catalog", 567 catalog_file, 568 "--state", 569 state_file, 570 ], 571 progress_tracker=progress_tracker, 572 ) 573 yield from progress_tracker.tally_records_read(message_generator) 574 progress_tracker.log_read_complete() 575 576 def _peek_airbyte_message( 577 self, 578 message: AirbyteMessage, 579 *, 580 raise_on_error: bool = True, 581 ) -> None: 582 """Process an Airbyte message. 583 584 This method handles reading Airbyte messages and taking action, if needed, based on the 585 message type. For instance, log messages are logged, records are tallied, and errors are 586 raised as exceptions if `raise_on_error` is True. 587 588 Raises: 589 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 590 """ 591 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 592 593 def _log_incremental_streams( 594 self, 595 *, 596 incremental_streams: set[str] | None = None, 597 ) -> None: 598 """Log the streams which are using incremental sync mode.""" 599 log_message = ( 600 "The following streams are currently using incremental sync:\n" 601 f"{incremental_streams}\n" 602 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 603 ) 604 print(log_message) 605 606 def read( 607 self, 608 cache: CacheBase | None = None, 609 *, 610 streams: str | list[str] | None = None, 611 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 612 force_full_refresh: bool = False, 613 skip_validation: bool = False, 614 ) -> ReadResult: 615 """Read from the connector and write to the cache. 616 617 Args: 618 cache: The cache to write to. If not set, a default cache will be used. 619 streams: Optional if already set. A list of stream names to select for reading. If set 620 to "*", all streams will be selected. 621 write_strategy: The strategy to use when writing to the cache. If a string, it must be 622 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 623 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 624 WriteStrategy.AUTO. 625 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 626 streams will be read in incremental mode if supported by the connector. This option 627 must be True when using the "replace" strategy. 628 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 629 running the connector. This can be helpful in debugging, when you want to send 630 configurations to the connector that otherwise might be rejected by JSON Schema 631 validation rules. 632 """ 633 cache = cache or get_default_cache() 634 progress_tracker = ProgressTracker( 635 source=self, 636 cache=cache, 637 destination=None, 638 expected_streams=None, # Will be set later 639 ) 640 641 # Set up state provider if not in full refresh mode 642 if force_full_refresh: 643 state_provider: StateProviderBase | None = None 644 else: 645 state_provider = cache.get_state_provider( 646 source_name=self._name, 647 ) 648 state_writer = cache.get_state_writer(source_name=self._name) 649 650 if streams: 651 self.select_streams(streams) 652 653 if not self._selected_stream_names: 654 raise exc.PyAirbyteNoStreamsSelectedError( 655 connector_name=self.name, 656 available_streams=self.get_available_streams(), 657 ) 658 659 try: 660 result = self._read_to_cache( 661 cache=cache, 662 catalog_provider=CatalogProvider(self.configured_catalog), 663 stream_names=self._selected_stream_names, 664 state_provider=state_provider, 665 state_writer=state_writer, 666 write_strategy=write_strategy, 667 force_full_refresh=force_full_refresh, 668 skip_validation=skip_validation, 669 progress_tracker=progress_tracker, 670 ) 671 except exc.PyAirbyteInternalError as ex: 672 progress_tracker.log_failure(exception=ex) 673 raise exc.AirbyteConnectorFailedError( 674 connector_name=self.name, 675 log_text=self._last_log_messages, 676 ) from ex 677 except Exception as ex: 678 progress_tracker.log_failure(exception=ex) 679 raise 680 681 progress_tracker.log_success() 682 return result 683 684 def _read_to_cache( # noqa: PLR0913 # Too many arguments 685 self, 686 cache: CacheBase, 687 *, 688 catalog_provider: CatalogProvider, 689 stream_names: list[str], 690 state_provider: StateProviderBase | None, 691 state_writer: StateWriterBase | None, 692 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 693 force_full_refresh: bool = False, 694 skip_validation: bool = False, 695 progress_tracker: ProgressTracker, 696 ) -> ReadResult: 697 """Internal read method.""" 698 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 699 warnings.warn( 700 message=( 701 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 702 "could result in data loss. " 703 "To silence this warning, use the following: " 704 'warnings.filterwarnings("ignore", ' 705 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 706 ), 707 category=exc.PyAirbyteDataLossWarning, 708 stacklevel=1, 709 ) 710 if isinstance(write_strategy, str): 711 try: 712 write_strategy = WriteStrategy(write_strategy) 713 except ValueError: 714 raise exc.PyAirbyteInputError( 715 message="Invalid strategy", 716 context={ 717 "write_strategy": write_strategy, 718 "available_strategies": [s.value for s in WriteStrategy], 719 }, 720 ) from None 721 722 # Run optional validation step 723 if not skip_validation: 724 self.validate_config() 725 726 # Log incremental stream if incremental streams are known 727 if state_provider and state_provider.known_stream_names: 728 # Retrieve set of the known streams support which support incremental sync 729 incremental_streams = ( 730 set(self._get_incremental_stream_names()) 731 & state_provider.known_stream_names 732 & set(self.get_selected_streams()) 733 ) 734 if incremental_streams: 735 self._log_incremental_streams(incremental_streams=incremental_streams) 736 737 airbyte_message_iterator = AirbyteMessageIterator( 738 self._read_with_catalog( 739 catalog=catalog_provider.configured_catalog, 740 state=state_provider, 741 progress_tracker=progress_tracker, 742 ) 743 ) 744 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 745 stdin=airbyte_message_iterator, 746 catalog_provider=catalog_provider, 747 write_strategy=write_strategy, 748 state_writer=state_writer, 749 progress_tracker=progress_tracker, 750 ) 751 752 # Flush the WAL, if applicable 753 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 754 755 return ReadResult( 756 source_name=self.name, 757 progress_tracker=progress_tracker, 758 processed_streams=stream_names, 759 cache=cache, 760 )
A class representing a source that can be called.
58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 ) -> None: 68 """Initialize the source. 69 70 If config is provided, it will be validated against the spec if validate is True. 71 """ 72 self._to_be_selected_streams: list[str] | str = [] 73 """Used to hold selection criteria before catalog is known.""" 74 75 super().__init__( 76 executor=executor, 77 name=name, 78 config=config, 79 config_change_callback=config_change_callback, 80 validate=validate, 81 ) 82 self._config_dict: dict[str, Any] | None = None 83 self._last_log_messages: list[str] = [] 84 self._discovered_catalog: AirbyteCatalog | None = None 85 self._selected_stream_names: list[str] = [] 86 if config is not None: 87 self.set_config(config, validate=validate) 88 if streams is not None: 89 self.select_streams(streams)
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
91 def set_streams(self, streams: list[str]) -> None: 92 """Deprecated. See select_streams().""" 93 warnings.warn( 94 "The 'set_streams' method is deprecated and will be removed in a future version. " 95 "Please use the 'select_streams' method instead.", 96 DeprecationWarning, 97 stacklevel=2, 98 ) 99 self.select_streams(streams)
Deprecated. See select_streams().
113 def select_all_streams(self) -> None: 114 """Select all streams. 115 116 This is a more streamlined equivalent to: 117 > source.select_streams(source.get_available_streams()). 118 """ 119 if self._config_dict is None: 120 self._to_be_selected_streams = "*" 121 self._log_warning_preselected_stream(self._to_be_selected_streams) 122 return 123 124 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
126 def select_streams(self, streams: str | list[str]) -> None: 127 """Select the stream names that should be read from the connector. 128 129 Args: 130 streams: A list of stream names to select. If set to "*", all streams will be selected. 131 132 Currently, if this is not set, all streams will be read. 133 """ 134 if self._config_dict is None: 135 self._to_be_selected_streams = streams 136 self._log_warning_preselected_stream(streams) 137 return 138 139 if streams == "*": 140 self.select_all_streams() 141 return 142 143 if isinstance(streams, str): 144 # If a single stream is provided, convert it to a one-item list 145 streams = [streams] 146 147 available_streams = self.get_available_streams() 148 for stream in streams: 149 if stream not in available_streams: 150 raise exc.AirbyteStreamNotFoundError( 151 stream_name=stream, 152 connector_name=self.name, 153 available_streams=available_streams, 154 ) 155 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
157 def get_selected_streams(self) -> list[str]: 158 """Get the selected streams. 159 160 If no streams are selected, return an empty list. 161 """ 162 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
164 def set_config( 165 self, 166 config: dict[str, Any], 167 *, 168 validate: bool = True, 169 ) -> None: 170 """Set the config for the connector. 171 172 If validate is True, raise an exception if the config fails validation. 173 174 If validate is False, validation will be deferred until check() or validate_config() 175 is called. 176 """ 177 if validate: 178 self.validate_config(config) 179 180 self._config_dict = config 181 182 if self._to_be_selected_streams: 183 self.select_streams(self._to_be_selected_streams) 184 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
186 def get_config(self) -> dict[str, Any]: 187 """Get the config for the connector.""" 188 return self._config
Get the config for the connector.
217 def get_available_streams(self) -> list[str]: 218 """Get the available streams from the spec.""" 219 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
251 @property 252 def config_spec(self) -> dict[str, Any]: 253 """Generate a configuration spec for this connector, as a JSON Schema definition. 254 255 This function generates a JSON Schema dictionary with configuration specs for the 256 current connector, as a dictionary. 257 258 Returns: 259 dict: The JSON Schema configuration spec as a dictionary. 260 """ 261 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
263 def print_config_spec( 264 self, 265 format: Literal["yaml", "json"] = "yaml", # noqa: A002 266 *, 267 output_file: Path | str | None = None, 268 ) -> None: 269 """Print the configuration spec for this connector. 270 271 Args: 272 format: The format to print the spec in. Must be "yaml" or "json". 273 output_file: Optional. If set, the spec will be written to the given file path. 274 Otherwise, it will be printed to the console. 275 """ 276 if format not in {"yaml", "json"}: 277 raise exc.PyAirbyteInputError( 278 message="Invalid format. Expected 'yaml' or 'json'", 279 input_value=format, 280 ) 281 if isinstance(output_file, str): 282 output_file = Path(output_file) 283 284 if format == "yaml": 285 content = yaml.dump(self.config_spec, indent=2) 286 elif format == "json": 287 content = json.dumps(self.config_spec, indent=2) 288 289 if output_file: 290 output_file.write_text(content) 291 return 292 293 syntax_highlighted = Syntax(content, format) 294 print(syntax_highlighted)
Print the configuration spec for this connector.
Arguments:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
311 @property 312 def docs_url(self) -> str: 313 """Get the URL to the connector's documentation.""" 314 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 315 "source-", "" 316 )
Get the URL to the connector's documentation.
318 @property 319 def discovered_catalog(self) -> AirbyteCatalog: 320 """Get the raw catalog for the given streams. 321 322 If the catalog is not yet known, we call discover to get it. 323 """ 324 if self._discovered_catalog is None: 325 self._discovered_catalog = self._discover() 326 327 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
329 @property 330 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 331 """Get the configured catalog for the given streams. 332 333 If the raw catalog is not yet known, we call discover to get it. 334 335 If no specific streams are selected, we return a catalog that syncs all available streams. 336 337 TODO: We should consider disabling by default the streams that the connector would 338 disable by default. (For instance, streams that require a premium license are sometimes 339 disabled by default within the connector.) 340 """ 341 # Ensure discovered catalog is cached before we start 342 _ = self.discovered_catalog 343 344 # Filter for selected streams if set, otherwise use all available streams: 345 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 346 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
348 def get_configured_catalog( 349 self, 350 streams: Literal["*"] | list[str] | None = None, 351 ) -> ConfiguredAirbyteCatalog: 352 """Get a configured catalog for the given streams. 353 354 If no streams are provided, the selected streams will be used. If no streams are selected, 355 all available streams will be used. 356 357 If '*' is provided, all available streams will be used. 358 """ 359 selected_streams: list[str] = [] 360 if streams is None: 361 selected_streams = self._selected_stream_names or self.get_available_streams() 362 elif streams == "*": 363 selected_streams = self.get_available_streams() 364 elif isinstance(streams, list): 365 selected_streams = streams 366 else: 367 raise exc.PyAirbyteInputError( 368 message="Invalid streams argument.", 369 input_value=streams, 370 ) 371 372 return ConfiguredAirbyteCatalog( 373 streams=[ 374 ConfiguredAirbyteStream( 375 stream=stream, 376 destination_sync_mode=DestinationSyncMode.overwrite, 377 primary_key=stream.source_defined_primary_key, 378 sync_mode=SyncMode.incremental, 379 ) 380 for stream in self.discovered_catalog.streams 381 if stream.name in selected_streams 382 ], 383 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
385 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 386 """Return the JSON Schema spec for the specified stream name.""" 387 catalog: AirbyteCatalog = self.discovered_catalog 388 found: list[AirbyteStream] = [ 389 stream for stream in catalog.streams if stream.name == stream_name 390 ] 391 392 if len(found) == 0: 393 raise exc.PyAirbyteInputError( 394 message="Stream name does not exist in catalog.", 395 input_value=stream_name, 396 ) 397 398 if len(found) > 1: 399 raise exc.PyAirbyteInternalError( 400 message="Duplicate streams found with the same name.", 401 context={ 402 "found_streams": found, 403 }, 404 ) 405 406 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
408 def get_records( 409 self, 410 stream: str, 411 *, 412 normalize_field_names: bool = False, 413 prune_undeclared_fields: bool = True, 414 ) -> LazyDataset: 415 """Read a stream from the connector. 416 417 Args: 418 stream: The name of the stream to read. 419 normalize_field_names: When `True`, field names will be normalized to lower case, with 420 special characters removed. This matches the behavior of PyAirbyte caches and most 421 Airbyte destinations. 422 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 423 which generally matches the behavior of PyAirbyte caches and most Airbyte 424 destinations, specifically when you expect the catalog may be stale. You can disable 425 this to keep all fields in the records. 426 427 This involves the following steps: 428 * Call discover to get the catalog 429 * Generate a configured catalog that syncs the given stream in full_refresh mode 430 * Write the configured catalog and the config to a temporary file 431 * execute the connector with read --config <config_file> --catalog <catalog_file> 432 * Listen to the messages and return the first AirbyteRecordMessages that come along. 433 * Make sure the subprocess is killed when the function returns. 434 """ 435 discovered_catalog: AirbyteCatalog = self.discovered_catalog 436 configured_catalog = ConfiguredAirbyteCatalog( 437 streams=[ 438 ConfiguredAirbyteStream( 439 stream=s, 440 sync_mode=SyncMode.full_refresh, 441 destination_sync_mode=DestinationSyncMode.overwrite, 442 ) 443 for s in discovered_catalog.streams 444 if s.name == stream 445 ], 446 ) 447 if len(configured_catalog.streams) == 0: 448 raise exc.PyAirbyteInputError( 449 message="Requested stream does not exist.", 450 context={ 451 "stream": stream, 452 "available_streams": self.get_available_streams(), 453 "connector_name": self.name, 454 }, 455 ) from KeyError(stream) 456 457 configured_stream = configured_catalog.streams[0] 458 459 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 460 yield from records 461 462 stream_record_handler = StreamRecordHandler( 463 json_schema=self.get_stream_json_schema(stream), 464 prune_extra_fields=prune_undeclared_fields, 465 normalize_keys=normalize_field_names, 466 ) 467 468 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 469 progress_tracker = ProgressTracker( 470 ProgressStyle.PLAIN, 471 source=self, 472 cache=None, 473 destination=None, 474 expected_streams=[stream], 475 ) 476 477 iterator: Iterator[dict[str, Any]] = ( 478 StreamRecord.from_record_message( 479 record_message=record.record, 480 stream_record_handler=stream_record_handler, 481 ) 482 for record in self._read_with_catalog( 483 catalog=configured_catalog, 484 progress_tracker=progress_tracker, 485 ) 486 if record.record 487 ) 488 progress_tracker.log_success() 489 return LazyDataset( 490 iterator, 491 stream_metadata=configured_stream, 492 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- normalize_field_names: When
True
, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True
, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
494 def get_documents( 495 self, 496 stream: str, 497 title_property: str | None = None, 498 content_properties: list[str] | None = None, 499 metadata_properties: list[str] | None = None, 500 *, 501 render_metadata: bool = False, 502 ) -> Iterable[Document]: 503 """Read a stream from the connector and return the records as documents. 504 505 If metadata_properties is not set, all properties that are not content will be added to 506 the metadata. 507 508 If render_metadata is True, metadata will be rendered in the document, as well as the 509 the main content. 510 """ 511 return self.get_records(stream).to_documents( 512 title_property=title_property, 513 content_properties=content_properties, 514 metadata_properties=metadata_properties, 515 render_metadata=render_metadata, 516 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
606 def read( 607 self, 608 cache: CacheBase | None = None, 609 *, 610 streams: str | list[str] | None = None, 611 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 612 force_full_refresh: bool = False, 613 skip_validation: bool = False, 614 ) -> ReadResult: 615 """Read from the connector and write to the cache. 616 617 Args: 618 cache: The cache to write to. If not set, a default cache will be used. 619 streams: Optional if already set. A list of stream names to select for reading. If set 620 to "*", all streams will be selected. 621 write_strategy: The strategy to use when writing to the cache. If a string, it must be 622 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 623 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 624 WriteStrategy.AUTO. 625 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 626 streams will be read in incremental mode if supported by the connector. This option 627 must be True when using the "replace" strategy. 628 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 629 running the connector. This can be helpful in debugging, when you want to send 630 configurations to the connector that otherwise might be rejected by JSON Schema 631 validation rules. 632 """ 633 cache = cache or get_default_cache() 634 progress_tracker = ProgressTracker( 635 source=self, 636 cache=cache, 637 destination=None, 638 expected_streams=None, # Will be set later 639 ) 640 641 # Set up state provider if not in full refresh mode 642 if force_full_refresh: 643 state_provider: StateProviderBase | None = None 644 else: 645 state_provider = cache.get_state_provider( 646 source_name=self._name, 647 ) 648 state_writer = cache.get_state_writer(source_name=self._name) 649 650 if streams: 651 self.select_streams(streams) 652 653 if not self._selected_stream_names: 654 raise exc.PyAirbyteNoStreamsSelectedError( 655 connector_name=self.name, 656 available_streams=self.get_available_streams(), 657 ) 658 659 try: 660 result = self._read_to_cache( 661 cache=cache, 662 catalog_provider=CatalogProvider(self.configured_catalog), 663 stream_names=self._selected_stream_names, 664 state_provider=state_provider, 665 state_writer=state_writer, 666 write_strategy=write_strategy, 667 force_full_refresh=force_full_refresh, 668 skip_validation=skip_validation, 669 progress_tracker=progress_tracker, 670 ) 671 except exc.PyAirbyteInternalError as ex: 672 progress_tracker.log_failure(exception=ex) 673 raise exc.AirbyteConnectorFailedError( 674 connector_name=self.name, 675 log_text=self._last_log_messages, 676 ) from ex 677 except Exception as ex: 678 progress_tracker.log_failure(exception=ex) 679 raise 680 681 progress_tracker.log_success() 682 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- config_hash
- validate_config
- connector_version
- check
- install
- uninstall